From d703fb9fcb5e5fc450932a4986eb3c9cecbde9c1 Mon Sep 17 00:00:00 2001 From: "jinhu.wjh" Date: Thu, 16 Aug 2018 11:06:01 +0800 Subject: [PATCH 01/10] add select api --- pom.xml | 2 +- .../java/com/aliyun/oss/ClientException.java | 4 +- src/main/java/com/aliyun/oss/OSS.java | 41 +++++-- src/main/java/com/aliyun/oss/OSSClient.java | 10 ++ .../oss/common/parser/RequestMarshallers.java | 110 ++++++++++++----- .../aliyun/oss/event/ProgressEventType.java | 7 +- .../aliyun/oss/event/ProgressPublisher.java | 8 ++ .../com/aliyun/oss/internal/OSSHeaders.java | 7 ++ .../oss/internal/OSSObjectOperation.java | 114 ++++++++++++++---- .../oss/internal/RequestParameters.java | 3 + .../com/aliyun/oss/internal/SignUtils.java | 45 +------ .../aliyun/oss/model/AccessControlList.java | 2 +- .../java/com/aliyun/oss/model/Bucket.java | 2 - .../java/com/aliyun/oss/model/Callback.java | 6 +- .../model/CompleteMultipartUploadRequest.java | 2 +- .../oss/model/CreateLiveChannelResult.java | 2 - .../java/com/aliyun/oss/model/OSSObject.java | 2 - .../com/aliyun/oss/model/ObjectMetadata.java | 2 +- src/main/resources/versioninfo.properties | 2 +- src/samples/SelectObjectSample.java | 78 ++++++++++++ 20 files changed, 329 insertions(+), 120 deletions(-) create mode 100644 src/samples/SelectObjectSample.java diff --git a/pom.xml b/pom.xml index 834b91b5..51f7dabc 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.aliyun.oss aliyun-sdk-oss - 3.1.0-SNAPSHOT + 3.3.0-SNAPSHOT jar Aliyun OSS SDK for Java The Aliyun OSS SDK for Java used for accessing Aliyun Object Storage Service diff --git a/src/main/java/com/aliyun/oss/ClientException.java b/src/main/java/com/aliyun/oss/ClientException.java index dee50857..6f6b5b8e 100644 --- a/src/main/java/com/aliyun/oss/ClientException.java +++ b/src/main/java/com/aliyun/oss/ClientException.java @@ -154,7 +154,7 @@ public String getRequestId() { @Override public String getMessage() { - return getErrorMessage() + "\n[ErrorCode]: " + errorCode != null ? errorCode - : "" + "\n[RequestId]: " + requestId != null ? requestId : ""; + return getErrorMessage() + "\n[ErrorCode]: " + (errorCode != null ? errorCode + : "") + "\n[RequestId]: " + (requestId != null ? requestId : ""); } } diff --git a/src/main/java/com/aliyun/oss/OSS.java b/src/main/java/com/aliyun/oss/OSS.java index 2e8100bb..8dd43568 100644 --- a/src/main/java/com/aliyun/oss/OSS.java +++ b/src/main/java/com/aliyun/oss/OSS.java @@ -36,7 +36,7 @@ *

* Object Store Service (a.k.a OSS) is the massive, secure, low cost and highly * reliable public storage which could be accessed from anywhere at anytime via - * REST APIs, SDKs or web console.
+ * REST APIs, SDKs or web console.
* Developers could use OSS to create any services that need huge data storage * and access throughput, such as media sharing web apps, cloud storage service * or enterprise or personal data backup. @@ -270,8 +270,8 @@ public interface OSS { * @param bucketName * Bucket name. * @param tags - * The dictionary that contains the tags in the form of pairs + * The dictionary that contains the tags in the form of <key, + * value> pairs */ public void setBucketTagging(String bucketName, Map tags) throws OSSException, ClientException; @@ -281,8 +281,8 @@ public interface OSS { * @param bucketName * Bucket name. * @param tagSet - * {@link TagSet} instance that has the tags in the form of paris. + * {@link TagSet} instance that has the tags in the form of <key, + * value> paris. */ public void setBucketTagging(String bucketName, TagSet tagSet) throws OSSException, ClientException; @@ -612,6 +612,23 @@ public CopyObjectResult copyObject(String sourceBucketName, String sourceKey, St */ public OSSObject getObject(GetObjectRequest getObjectRequest) throws OSSException, ClientException; + /** + * Select the {@link OSSObject} from the bucket specified in + * {@link SelectObjectRequest} parameter + * @param selectObjectRequest + * A {@link SelectObjectRequest} instance which specifies the + * bucket name + * object key + * filter expression + * input serialization + * output serialization + * @return A {@link OSSObject} instance will be returned. The caller is + * responsible to close the connection after usage. + * @throws OSSException + * @throws ClientException + */ + public OSSObject selectObject(SelectObjectRequest selectObjectRequest) throws OSSException, ClientException; + /** * Gets the {@link OSSObject} from the signed Url. * @@ -679,6 +696,16 @@ public SimplifiedObjectMeta getSimplifiedObjectMeta(GenericRequest genericReques */ public ObjectMetadata getObjectMetadata(GenericRequest genericRequest) throws OSSException, ClientException; + /** + * Create select object metadata(create metadata if not exists or overwrite flag set in {@link CreateSelectObjectMetadataRequest}) + * + * @param createSelectObjectMetadataRequest + * {@link CreateSelectObjectMetadataRequest} create select object metadata request. + * + * @return The {@link SelectObjectMetadata} instance. + */ + public SelectObjectMetadata createSelectObjectMetadata(CreateSelectObjectMetadataRequest createSelectObjectMetadataRequest) throws OSSException, ClientException; + /** * Append the data to the appendable object specified in * {@link AppendObjectRequest}. It's not applicable to normal OSS object. @@ -1826,7 +1853,7 @@ public void setBucketStorageCapacity(SetBucketStorageCapacityRequest setBucketSt * * @param uploadFileRequest * A {@link UploadFileRequest} instance that specifies the bucket - * name, object key, file path ,part size (>100K) and thread + * name, object key, file path ,part size (> 100K) and thread * count (from 1 to 1000) and checkpoint file. * @return A {@link UploadFileRequest} instance which has the new uploaded * file's key, ETag, location. @@ -1848,7 +1875,7 @@ public void setBucketStorageCapacity(SetBucketStorageCapacityRequest setBucketSt * * @param downloadFileRequest * A {@link DownloadFileRequest} instance that specifies the - * bucket name, object key, file path, part size (>100K) and + * bucket name, object key, file path, part size (> 100K) and * thread count (from 1 to 1000) and checkpoint file. Also it * could have the ETag and ModifiedSince constraints. * @return A {@link DownloadFileResult} instance that has the diff --git a/src/main/java/com/aliyun/oss/OSSClient.java b/src/main/java/com/aliyun/oss/OSSClient.java index 0081c959..bee1e83b 100644 --- a/src/main/java/com/aliyun/oss/OSSClient.java +++ b/src/main/java/com/aliyun/oss/OSSClient.java @@ -561,6 +561,11 @@ public OSSObject getObject(URL signedUrl, Map requestHeaders) th return objectOperation.getObject(getObjectRequest); } + @Override + public OSSObject selectObject(SelectObjectRequest selectObjectRequest) throws OSSException, ClientException { + return objectOperation.selectObject(selectObjectRequest); + } + @Override public SimplifiedObjectMeta getSimplifiedObjectMeta(String bucketName, String key) throws OSSException, ClientException { @@ -578,6 +583,11 @@ public ObjectMetadata getObjectMetadata(String bucketName, String key) throws OS return this.getObjectMetadata(new GenericRequest(bucketName, key)); } + @Override + public SelectObjectMetadata createSelectObjectMetadata(CreateSelectObjectMetadataRequest createSelectObjectMetadataRequest) throws OSSException, ClientException { + return objectOperation.createSelectObjectMetadata(createSelectObjectMetadataRequest); + } + @Override public ObjectMetadata getObjectMetadata(GenericRequest genericRequest) throws OSSException, ClientException { return objectOperation.getObjectMetadata(genericRequest); diff --git a/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java b/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java index f4e1eb89..eff4c6a5 100644 --- a/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java +++ b/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java @@ -29,42 +29,15 @@ import com.aliyun.oss.ClientException; import com.aliyun.oss.common.comm.io.FixedLengthInputStream; +import com.aliyun.oss.common.utils.BinaryUtil; import com.aliyun.oss.common.utils.DateUtil; import com.aliyun.oss.internal.RequestParameters; +import com.aliyun.oss.model.*; import com.aliyun.oss.model.AddBucketReplicationRequest.ReplicationAction; -import com.aliyun.oss.model.BucketReferer; -import com.aliyun.oss.model.CompleteMultipartUploadRequest; -import com.aliyun.oss.model.CreateBucketRequest; -import com.aliyun.oss.model.CreateLiveChannelRequest; -import com.aliyun.oss.model.CreateUdfApplicationRequest; -import com.aliyun.oss.model.CreateUdfRequest; -import com.aliyun.oss.model.DeleteBucketCnameRequest; -import com.aliyun.oss.model.DeleteObjectsRequest; -import com.aliyun.oss.model.ImageProcess; -import com.aliyun.oss.model.LifecycleRule; import com.aliyun.oss.model.LifecycleRule.AbortMultipartUpload; import com.aliyun.oss.model.LifecycleRule.RuleStatus; import com.aliyun.oss.model.LifecycleRule.StorageTransition; -import com.aliyun.oss.model.LiveChannelTarget; -import com.aliyun.oss.model.PartETag; -import com.aliyun.oss.model.ProcessObjectRequest; -import com.aliyun.oss.model.PutBucketImageRequest; -import com.aliyun.oss.model.PutImageStyleRequest; -import com.aliyun.oss.model.ResizeUdfApplicationRequest; -import com.aliyun.oss.model.SetBucketCORSRequest; import com.aliyun.oss.model.SetBucketCORSRequest.CORSRule; -import com.aliyun.oss.model.DeleteBucketReplicationRequest; -import com.aliyun.oss.model.RoutingRule; -import com.aliyun.oss.model.AddBucketCnameRequest; -import com.aliyun.oss.model.SetBucketLifecycleRequest; -import com.aliyun.oss.model.SetBucketLoggingRequest; -import com.aliyun.oss.model.AddBucketReplicationRequest; -import com.aliyun.oss.model.SetBucketTaggingRequest; -import com.aliyun.oss.model.SetBucketWebsiteRequest; -import com.aliyun.oss.model.TagSet; -import com.aliyun.oss.model.UdfApplicationConfiguration; -import com.aliyun.oss.model.UpgradeUdfApplicationRequest; -import com.aliyun.oss.model.UserQos; /** * A collection of marshallers that marshall HTTP request into crossponding @@ -99,6 +72,9 @@ public final class RequestMarshallers { public static final ResizeUdfApplicationRequestMarshaller resizeUdfApplicationRequestMarshaller = new ResizeUdfApplicationRequestMarshaller(); public static final ProcessObjectRequestMarshaller processObjectRequestMarshaller = new ProcessObjectRequestMarshaller(); + public static final CreateSelectObjectMetadataRequestMarshaller createSelectObjectMetadataRequestMarshaller = new CreateSelectObjectMetadataRequestMarshaller(); + public static final SelectObjectRequestMarshaller selectObjectRequestMarshaller = new SelectObjectRequestMarshaller(); + public interface RequestMarshaller extends Marshaller { } @@ -501,6 +477,82 @@ public FixedLengthInputStream marshall(CompleteMultipartUploadRequest request) { } + public static final class CreateSelectObjectMetadataRequestMarshaller + implements RequestMarshaller2 { + + @Override + public byte[] marshall(CreateSelectObjectMetadataRequest request) { + StringBuffer xmlBody = new StringBuffer(); + InputSerialization inputSerialization = request.getInputSerialization(); + CSVFormat csvFormat = inputSerialization.getCsvInputFormat(); + xmlBody.append(""); + xmlBody.append(""); + xmlBody.append("" + inputSerialization.getCompressionType() + ""); + xmlBody.append(""); + xmlBody.append("" + BinaryUtil.toBase64String(csvFormat.getRecordDelimiter().getBytes()) + ""); + xmlBody.append("" + BinaryUtil.toBase64String(csvFormat.getFieldDelimiter().toString().getBytes()) + ""); + xmlBody.append("" + BinaryUtil.toBase64String(csvFormat.getQuoteChar().toString().getBytes()) + ""); + xmlBody.append(""); + xmlBody.append(""); + xmlBody.append("" + request.isOverwrite() + ""); + xmlBody.append(""); + + try { + return xmlBody.toString().getBytes(DEFAULT_CHARSET_NAME); + } catch (UnsupportedEncodingException e) { + throw new ClientException("Unsupported encoding " + e.getMessage(), e); + } + } + } + + public static final class SelectObjectRequestMarshaller implements RequestMarshaller2 { + + @Override + public byte[] marshall(SelectObjectRequest request) { + StringBuffer xmlBody = new StringBuffer(); + xmlBody.append(""); + + xmlBody.append("" + BinaryUtil.toBase64String(request.getExpression().getBytes()) + ""); + InputSerialization inputSerialization = request.getInputSerialization(); + CSVFormat csvInputFormat = inputSerialization.getCsvInputFormat(); + xmlBody.append(""); + xmlBody.append("" + inputSerialization.getCompressionType() + ""); + xmlBody.append(""); + xmlBody.append("" + csvInputFormat.getHeaderInfo() + ""); + xmlBody.append("" + BinaryUtil.toBase64String(csvInputFormat.getRecordDelimiter().getBytes()) + ""); + xmlBody.append("" + BinaryUtil.toBase64String(csvInputFormat.getFieldDelimiter().toString().getBytes()) + ""); + xmlBody.append("" + BinaryUtil.toBase64String(csvInputFormat.getQuoteChar().toString().getBytes()) + ""); + xmlBody.append("" + BinaryUtil.toBase64String(csvInputFormat.getCommentChar().toString().getBytes()) + ""); + + if (request.getLineRange() != null) { + xmlBody.append("" + request.lineRangeToString(request.getLineRange()) + ""); + } + if (request.getSplitRange() != null) { + xmlBody.append("" + request.splitRangeToString(request.getSplitRange()) + ""); + } + xmlBody.append(""); + xmlBody.append(""); + OutputSerialization outputSerialization = request.getOutputSerialization(); + CSVFormat csvOutputFormat = outputSerialization.getCsvOutputFormat(); + xmlBody.append(""); + xmlBody.append(""); + xmlBody.append("" + BinaryUtil.toBase64String(csvOutputFormat.getRecordDelimiter().getBytes()) + ""); + xmlBody.append("" + BinaryUtil.toBase64String(csvOutputFormat.getFieldDelimiter().toString().getBytes()) + ""); + xmlBody.append("" + BinaryUtil.toBase64String(csvOutputFormat.getQuoteChar().toString().getBytes()) + ""); + xmlBody.append("" + outputSerialization.isKeepAllColumns() + ""); + xmlBody.append(""); + xmlBody.append("" + outputSerialization.isOutputRawData() + ""); + xmlBody.append(""); + xmlBody.append(""); + + try { + return xmlBody.toString().getBytes(DEFAULT_CHARSET_NAME); + } catch (UnsupportedEncodingException e) { + throw new ClientException("Unsupported encoding " + e.getMessage(), e); + } + } + } + public static final class DeleteObjectsRequestMarshaller implements RequestMarshaller2 { @Override diff --git a/src/main/java/com/aliyun/oss/event/ProgressEventType.java b/src/main/java/com/aliyun/oss/event/ProgressEventType.java index d3c5feba..4ab79e13 100644 --- a/src/main/java/com/aliyun/oss/event/ProgressEventType.java +++ b/src/main/java/com/aliyun/oss/event/ProgressEventType.java @@ -44,5 +44,10 @@ public enum ProgressEventType { /** * Transfer events. */ - TRANSFER_PREPARING_EVENT, TRANSFER_STARTED_EVENT, TRANSFER_COMPLETED_EVENT, TRANSFER_FAILED_EVENT, TRANSFER_CANCELED_EVENT, TRANSFER_PART_STARTED_EVENT, TRANSFER_PART_COMPLETED_EVENT, TRANSFER_PART_FAILED_EVENT; + TRANSFER_PREPARING_EVENT, TRANSFER_STARTED_EVENT, TRANSFER_COMPLETED_EVENT, TRANSFER_FAILED_EVENT, TRANSFER_CANCELED_EVENT, TRANSFER_PART_STARTED_EVENT, TRANSFER_PART_COMPLETED_EVENT, TRANSFER_PART_FAILED_EVENT, + + /** + * Select object events. + */ + SELECT_STARTED_EVENT, SELECT_SCAN_EVENT, SELECT_COMPLETED_EVENT, SELECT_FAILED_EVENT } diff --git a/src/main/java/com/aliyun/oss/event/ProgressPublisher.java b/src/main/java/com/aliyun/oss/event/ProgressPublisher.java index 9d83dee4..44173f1f 100644 --- a/src/main/java/com/aliyun/oss/event/ProgressPublisher.java +++ b/src/main/java/com/aliyun/oss/event/ProgressPublisher.java @@ -33,6 +33,14 @@ public static void publishProgress(final ProgressListener listener, final Progre listener.progressChanged(new ProgressEvent(eventType)); } + public static void publishSelectProgress(final ProgressListener listener, final ProgressEventType eventType, + final long scannedBytes) { + if (listener == ProgressListener.NOOP || listener == null || eventType == null) { + return; + } + listener.progressChanged(new ProgressEvent(eventType, scannedBytes)); + } + public static void publishRequestContentLength(final ProgressListener listener, final long bytes) { publishByteCountEvent(listener, REQUEST_CONTENT_LENGTH_EVENT, bytes); } diff --git a/src/main/java/com/aliyun/oss/internal/OSSHeaders.java b/src/main/java/com/aliyun/oss/internal/OSSHeaders.java index bc2a0dca..768ac646 100644 --- a/src/main/java/com/aliyun/oss/internal/OSSHeaders.java +++ b/src/main/java/com/aliyun/oss/internal/OSSHeaders.java @@ -80,4 +80,11 @@ public interface OSSHeaders extends HttpHeaders { static final String OSS_ONGOING_RESTORE = "ongoing-request=\"true\""; static final String OSS_BUCKET_REGION = "x-oss-bucket-region"; + + static final String OSS_SELECT_PREFIX = "x-oss-select"; + static final String OSS_SELECT_CSV_ROWS = OSS_SELECT_PREFIX + "-csv-rows"; + static final String OSS_SELECT_CSV_SPLITS = OSS_SELECT_PREFIX + "-csv-splits"; + static final String OSS_SELECT_INPUT_LINE_RANGE = OSS_SELECT_PREFIX + "-line-range"; + static final String OSS_SELECT_INPUT_SPLIT_RANGE = OSS_SELECT_PREFIX + "-split-range"; + } diff --git a/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java b/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java index 33dcac54..6669e4f0 100644 --- a/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java +++ b/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java @@ -19,8 +19,7 @@ package com.aliyun.oss.internal; -import static com.aliyun.oss.common.parser.RequestMarshallers.deleteObjectsRequestMarshaller; -import static com.aliyun.oss.common.parser.RequestMarshallers.processObjectRequestMarshaller; +import static com.aliyun.oss.common.parser.RequestMarshallers.*; import static com.aliyun.oss.common.utils.CodingUtils.assertParameterNotNull; import static com.aliyun.oss.common.utils.CodingUtils.assertStringNotNullOrEmpty; import static com.aliyun.oss.common.utils.CodingUtils.assertTrue; @@ -32,6 +31,8 @@ import static com.aliyun.oss.event.ProgressPublisher.publishProgress; import static com.aliyun.oss.internal.OSSConstants.DEFAULT_BUFFER_SIZE; import static com.aliyun.oss.internal.OSSConstants.DEFAULT_CHARSET_NAME; +import static com.aliyun.oss.internal.OSSHeaders.OSS_SELECT_CSV_ROWS; +import static com.aliyun.oss.internal.OSSHeaders.OSS_SELECT_CSV_SPLITS; import static com.aliyun.oss.internal.OSSUtils.OSS_RESOURCE_MANAGER; import static com.aliyun.oss.internal.OSSUtils.addDateHeader; import static com.aliyun.oss.internal.OSSUtils.addHeader; @@ -76,6 +77,7 @@ import java.util.Map; import java.util.zip.CheckedInputStream; +import com.aliyun.oss.model.*; import org.apache.http.HttpStatus; import com.aliyun.oss.ClientException; @@ -102,28 +104,6 @@ import com.aliyun.oss.event.ProgressInputStream; import com.aliyun.oss.event.ProgressListener; import com.aliyun.oss.internal.ResponseParsers.GetObjectResponseParser; -import com.aliyun.oss.model.AppendObjectRequest; -import com.aliyun.oss.model.AppendObjectResult; -import com.aliyun.oss.model.CannedAccessControlList; -import com.aliyun.oss.model.CopyObjectRequest; -import com.aliyun.oss.model.CopyObjectResult; -import com.aliyun.oss.model.CreateSymlinkRequest; -import com.aliyun.oss.model.DeleteObjectsRequest; -import com.aliyun.oss.model.DeleteObjectsResult; -import com.aliyun.oss.model.GenericRequest; -import com.aliyun.oss.model.GenericResult; -import com.aliyun.oss.model.GetObjectRequest; -import com.aliyun.oss.model.HeadObjectRequest; -import com.aliyun.oss.model.OSSObject; -import com.aliyun.oss.model.OSSSymlink; -import com.aliyun.oss.model.ObjectAcl; -import com.aliyun.oss.model.ObjectMetadata; -import com.aliyun.oss.model.ProcessObjectRequest; -import com.aliyun.oss.model.PutObjectRequest; -import com.aliyun.oss.model.PutObjectResult; -import com.aliyun.oss.model.RestoreObjectResult; -import com.aliyun.oss.model.SetObjectAclRequest; -import com.aliyun.oss.model.SimplifiedObjectMeta; /** * Object operation. @@ -215,6 +195,92 @@ public AppendObjectResult appendObject(AppendObjectRequest appendObjectRequest) return result; } + public SelectObjectMetadata createSelectObjectMetadata(CreateSelectObjectMetadataRequest createSelectObjectMetadataRequest) throws OSSException, ClientException { + String process = createSelectObjectMetadataRequest.getProcess(); + assertParameterNotNull(process, "process"); + + GenericRequest genericRequest = new GenericRequest( + createSelectObjectMetadataRequest.getBucketName(), createSelectObjectMetadataRequest.getKey()); + genericRequest.getParameters().put(RequestParameters.SUBRESOURCE_PROCESS, process); + + String bucketName = genericRequest.getBucketName(); + String key = genericRequest.getKey(); + + assertParameterNotNull(bucketName, "bucketName"); + assertParameterNotNull(key, "key"); + ensureBucketNameValid(bucketName); + ensureObjectKeyValid(key); + + byte[] content = createSelectObjectMetadataRequestMarshaller.marshall(createSelectObjectMetadataRequest); + RequestMessage request = new OSSRequestMessageBuilder(getInnerClient()).setEndpoint(getEndpoint()) + .setMethod(HttpMethod.POST).setInputSize(content.length).setInputStream(new ByteArrayInputStream(content)) + .setBucket(bucketName).setKey(key).setOriginalRequest(genericRequest) + .build(); + + ObjectMetadata objectMetadata = doOperation(request, getObjectMetadataResponseParser, bucketName, key, true, null, null); + SelectObjectMetadata selectObjectMetadata = new SelectObjectMetadata(objectMetadata); + selectObjectMetadata.setCsvObjectMetadata( + new SelectObjectMetadata.CsvObjectMetadata() + .withTotalLines(Integer.parseInt(objectMetadata.getRawMetadata().get(OSS_SELECT_CSV_ROWS).toString())) + .withSplits(Integer.parseInt(objectMetadata.getRawMetadata().get(OSS_SELECT_CSV_SPLITS).toString()))); + return selectObjectMetadata; + } + + /** + * Select an object from oss. + */ + public OSSObject selectObject(SelectObjectRequest selectObjectRequest) throws OSSException, ClientException { + assertParameterNotNull(selectObjectRequest, "selectObjectRequest"); + String bucketName = selectObjectRequest.getBucketName(); + String key = selectObjectRequest.getKey(); + assertParameterNotNull(bucketName, "bucketName"); + assertParameterNotNull(key, "key"); + ensureBucketNameValid(bucketName); + ensureObjectKeyValid(key); + Map headers = new HashMap(); + populateGetObjectRequestHeaders(selectObjectRequest, headers); + + Map params = new HashMap(); + populateResponseHeaderParameters(params, selectObjectRequest.getResponseHeaders()); + String process = selectObjectRequest.getProcess(); + assertParameterNotNull(process, "process"); + + params.put(RequestParameters.SUBRESOURCE_PROCESS, process); + + SelectObjectRequest.ExpressionType expressionType = selectObjectRequest.getExpressionType(); + if (expressionType != SelectObjectRequest.ExpressionType.SQL) { + throw new IllegalArgumentException("Select object only support sql expression"); + } + if (selectObjectRequest.getExpression() == null) { + throw new IllegalArgumentException("Select expression is null"); + } + if (selectObjectRequest.getLineRange() != null && selectObjectRequest.getSplitRange() != null) { + throw new IllegalArgumentException("Line range and split range of select request should not both set"); + } + + byte[] content = selectObjectRequestMarshaller.marshall(selectObjectRequest); + + headers.put(HttpHeaders.CONTENT_MD5, BinaryUtil.toBase64String(BinaryUtil.calculateMd5(content))); + RequestMessage request = new OSSRequestMessageBuilder(getInnerClient()).setEndpoint(getEndpoint()) + .setMethod(HttpMethod.POST).setBucket(bucketName).setKey(key).setHeaders(headers) + .setInputSize(content.length).setInputStream(new ByteArrayInputStream(content)) + .setParameters(params).setOriginalRequest(selectObjectRequest).build(); + //select progress listener(scanned bytes) + final ProgressListener selectProgressListener = selectObjectRequest.getSelectProgressListener(); + try { + OSSObject ossObject = doOperation(request, new GetObjectResponseParser(bucketName, key), bucketName, key, true); + publishProgress(selectProgressListener, ProgressEventType.SELECT_STARTED_EVENT); + InputStream inputStream = ossObject.getObjectContent(); + if (!selectObjectRequest.getOutputSerialization().isOutputRawData()) { + ossObject.setObjectContent(new SelectInputStream(inputStream, selectProgressListener)); + } + return ossObject; + } catch (RuntimeException e) { + publishProgress(selectProgressListener, ProgressEventType.SELECT_FAILED_EVENT); + throw e; + } + } + /** * Pull an object from oss. */ diff --git a/src/main/java/com/aliyun/oss/internal/RequestParameters.java b/src/main/java/com/aliyun/oss/internal/RequestParameters.java index 405236f6..05f80830 100644 --- a/src/main/java/com/aliyun/oss/internal/RequestParameters.java +++ b/src/main/java/com/aliyun/oss/internal/RequestParameters.java @@ -49,6 +49,9 @@ public final class RequestParameters { public static final String SUBRESOURCE_END_TIME = "endTime"; public static final String SUBRESOURCE_PROCESS_CONF = "processConfiguration"; public static final String SUBRESOURCE_PROCESS = "x-oss-process"; + public static final String SUBRESOURCE_CSV_SELECT = "csv/select"; + public static final String SUBRESOURCE_CSV_META = "csv/meta"; + public static final String SUBRESOURCE_SQL = "sql"; public static final String SUBRESOURCE_SYMLINK = "symlink"; public static final String SUBRESOURCE_STAT = "stat"; public static final String SUBRESOURCE_RESTORE = "restore"; diff --git a/src/main/java/com/aliyun/oss/internal/SignUtils.java b/src/main/java/com/aliyun/oss/internal/SignUtils.java index c3515754..3202078e 100644 --- a/src/main/java/com/aliyun/oss/internal/SignUtils.java +++ b/src/main/java/com/aliyun/oss/internal/SignUtils.java @@ -20,48 +20,7 @@ package com.aliyun.oss.internal; import static com.aliyun.oss.common.utils.CodingUtils.assertTrue; -import static com.aliyun.oss.internal.RequestParameters.PART_NUMBER; -import static com.aliyun.oss.internal.RequestParameters.POSITION; -import static com.aliyun.oss.internal.RequestParameters.SECURITY_TOKEN; -import static com.aliyun.oss.internal.RequestParameters.STYLE_NAME; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_ACL; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_APPEND; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_CORS; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_DELETE; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_IMG; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_LIFECYCLE; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_LOCATION; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_LOGGING; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_REFERER; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_STYLE; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_UPLOADS; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_WEBSITE; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_TAGGING; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_REPLICATION; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_REPLICATION_PROGRESS; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_REPLICATION_LOCATION; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_CNAME; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_BUCKET_INFO; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_COMP; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_OBJECTMETA; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_LIVE; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_STATUS; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_VOD; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_START_TIME; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_END_TIME; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_PROCESS_CONF; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_PROCESS; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_SYMLINK; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_STAT; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_UDF; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_UDF_NAME; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_UDF_IMAGE; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_UDF_IMAGE_DESC; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_UDF_APPLICATION; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_UDF_LOG; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_RESTORE; -import static com.aliyun.oss.internal.RequestParameters.UPLOAD_ID; -import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_QOS; +import static com.aliyun.oss.internal.RequestParameters.*; import static com.aliyun.oss.model.ResponseHeaderOverrides.RESPONSE_HEADER_CACHE_CONTROL; import static com.aliyun.oss.model.ResponseHeaderOverrides.RESPONSE_HEADER_CONTENT_DISPOSITION; import static com.aliyun.oss.model.ResponseHeaderOverrides.RESPONSE_HEADER_CONTENT_ENCODING; @@ -93,7 +52,7 @@ public class SignUtils { SUBRESOURCE_STATUS, SUBRESOURCE_VOD, SUBRESOURCE_START_TIME, SUBRESOURCE_END_TIME, SUBRESOURCE_PROCESS, SUBRESOURCE_PROCESS_CONF, SUBRESOURCE_SYMLINK, SUBRESOURCE_STAT, SUBRESOURCE_UDF, SUBRESOURCE_UDF_NAME, SUBRESOURCE_UDF_IMAGE, SUBRESOURCE_UDF_IMAGE_DESC, SUBRESOURCE_UDF_APPLICATION, SUBRESOURCE_UDF_LOG, - SUBRESOURCE_RESTORE, }); + SUBRESOURCE_RESTORE, SUBRESOURCE_CSV_SELECT, SUBRESOURCE_CSV_META, SUBRESOURCE_SQL}); public static String buildCanonicalString(String method, String resourcePath, RequestMessage request, String expires) { diff --git a/src/main/java/com/aliyun/oss/model/AccessControlList.java b/src/main/java/com/aliyun/oss/model/AccessControlList.java index 00ce93ed..f1311797 100644 --- a/src/main/java/com/aliyun/oss/model/AccessControlList.java +++ b/src/main/java/com/aliyun/oss/model/AccessControlList.java @@ -26,7 +26,7 @@ /** * The class encapsulates the access control list (ACL) information of OSS. It - * includes an owner and a group of <{@link Grantee},{@link Permission}> pair. + * includes an owner and a group of <{@link Grantee},{@link Permission}> pair. */ public class AccessControlList extends GenericResult implements Serializable { diff --git a/src/main/java/com/aliyun/oss/model/Bucket.java b/src/main/java/com/aliyun/oss/model/Bucket.java index 86e128e5..39889ec6 100644 --- a/src/main/java/com/aliyun/oss/model/Bucket.java +++ b/src/main/java/com/aliyun/oss/model/Bucket.java @@ -30,14 +30,12 @@ * mapped to one or multiple buckets. An OSS account could only create up to 10 * bucket. And there's no limit on the files count or size under a bucket. *

- *

* Bucket naming rules: *

- *

*/ public class Bucket extends GenericResult { diff --git a/src/main/java/com/aliyun/oss/model/Callback.java b/src/main/java/com/aliyun/oss/model/Callback.java index b8be3428..eeb18a83 100644 --- a/src/main/java/com/aliyun/oss/model/Callback.java +++ b/src/main/java/com/aliyun/oss/model/Callback.java @@ -88,7 +88,7 @@ public String getCallbackBody() { /** * Sets the callback body.For example: - * key=$(key)&etag=$(etag)&my_var=$(x:my_var). It supports the OSS system + * key=$(key) & etag=$(etag) & my_var=$(x:my_var). It supports the OSS system * variable, custom defined variable or constant and custom defined * variable's callbackVar. * @@ -124,12 +124,12 @@ public Map getCallbackVar() { /** * Sets user customized parameter(s). * - * Customized parameter is a Map instance. In the callback + * Customized parameter is a Map<key,value> instance. In the callback * request, OSS would put these parameters into the post body. The keys must * start with "x:", such as x:my_var. * * @param callbackVar - * A {@link Map} instance that stores the pairs. + * A {@link Map} instance that stores the <key, value> pairs. */ public void setCallbackVar(Map callbackVar) { this.callbackVar.clear(); diff --git a/src/main/java/com/aliyun/oss/model/CompleteMultipartUploadRequest.java b/src/main/java/com/aliyun/oss/model/CompleteMultipartUploadRequest.java index 512df114..06f51124 100644 --- a/src/main/java/com/aliyun/oss/model/CompleteMultipartUploadRequest.java +++ b/src/main/java/com/aliyun/oss/model/CompleteMultipartUploadRequest.java @@ -116,7 +116,7 @@ public CannedAccessControlList getObjectACL() { /** * Sets Object ACL。 * - * @param Object + * @param cannedACL * ACL。 */ public void setObjectACL(CannedAccessControlList cannedACL) { diff --git a/src/main/java/com/aliyun/oss/model/CreateLiveChannelResult.java b/src/main/java/com/aliyun/oss/model/CreateLiveChannelResult.java index 8e145f77..54d17ca3 100644 --- a/src/main/java/com/aliyun/oss/model/CreateLiveChannelResult.java +++ b/src/main/java/com/aliyun/oss/model/CreateLiveChannelResult.java @@ -42,8 +42,6 @@ public List getPlayUrls() { /** * Gets the playback urls. - * - * @return The playback urls. */ public void setPlayUrls(List playUrls) { this.playUrls = playUrls; diff --git a/src/main/java/com/aliyun/oss/model/OSSObject.java b/src/main/java/com/aliyun/oss/model/OSSObject.java index ed6c6df5..f784ed3d 100644 --- a/src/main/java/com/aliyun/oss/model/OSSObject.java +++ b/src/main/java/com/aliyun/oss/model/OSSObject.java @@ -33,14 +33,12 @@ * content. The user metadata is a dictionary of key-value entries to store some * custom data about the object. *

- *

* Object naming rules *

    *
  • use UTF-8 encoding
  • *
  • Length is between 1 to 1023
  • *
  • Could not have slash or backslash
  • *
- *

* */ public class OSSObject extends GenericResult implements Closeable { diff --git a/src/main/java/com/aliyun/oss/model/ObjectMetadata.java b/src/main/java/com/aliyun/oss/model/ObjectMetadata.java index a26e1ec9..f00e0069 100644 --- a/src/main/java/com/aliyun/oss/model/ObjectMetadata.java +++ b/src/main/java/com/aliyun/oss/model/ObjectMetadata.java @@ -38,7 +38,7 @@ public class ObjectMetadata { private Map userMetadata = new HashMap(); // Other non-custom metadata. - private Map metadata = new HashMap(); + protected Map metadata = new HashMap(); public static final String AES_256_SERVER_SIDE_ENCRYPTION = "AES256"; diff --git a/src/main/resources/versioninfo.properties b/src/main/resources/versioninfo.properties index 5b2a680f..14103c3f 100644 --- a/src/main/resources/versioninfo.properties +++ b/src/main/resources/versioninfo.properties @@ -1 +1 @@ -version=3.1.0 +version=3.3.0 diff --git a/src/samples/SelectObjectSample.java b/src/samples/SelectObjectSample.java new file mode 100644 index 00000000..60165710 --- /dev/null +++ b/src/samples/SelectObjectSample.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samples; + +import com.aliyun.oss.event.ProgressEvent; +import com.aliyun.oss.event.ProgressListener; +import com.aliyun.oss.model.*; +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.FileOutputStream; + +/** + * Examples of create select object metadata and select object. + * + */ +public class SelectObjectSample { + private static String endpoint = ""; + private static String accessKeyId = ""; + private static String accessKeySecret = ""; + private static String bucketName = ""; + private static String key = ""; + + public static void main(String[] args) throws Exception { + OSS client = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret); + String content = "name,school,company,age\r\n" + + "Lora Francis,School A,Staples Inc,27\r\n" + + "Eleanor Little,School B,\"Conectiv, Inc\",43\r\n" + + "Rosie Hughes,School C,Western Gas Resources Inc,44\r\n" + + "Lawrence Ross,School D,MetLife Inc.,24"; + + client.putObject(bucketName, key, new ByteArrayInputStream(content.getBytes())); + + SelectObjectMetadata selectObjectMetadata = client.createSelectObjectMetadata( + new CreateSelectObjectMetadataRequest(bucketName, key) + .withInputSerialization( + new InputSerialization().withCsvInputFormat( + new CSVFormat().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n")))); + System.out.println(selectObjectMetadata.getCsvObjectMetadata().getTotalLines()); + System.out.println(selectObjectMetadata.getCsvObjectMetadata().getSplits()); + + SelectObjectRequest selectObjectRequest = + new SelectObjectRequest(bucketName, key) + .withInputSerialization( + new InputSerialization().withCsvInputFormat( + new CSVFormat().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n"))) + .withOutputSerialization(new OutputSerialization().withCsvOutputFormat(new CSVFormat())); + selectObjectRequest.setExpression("select * from ossobject where _4 > 40"); + OSSObject ossObject = client.selectObject(selectObjectRequest); + // read object content from ossObject + BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream("result.data")); + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = ossObject.getObjectContent().read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + outputStream.close(); + } +} \ No newline at end of file From 29e67c481c1685e27569c8e79dfb5e7f0a635162 Mon Sep 17 00:00:00 2001 From: "jinhu.wjh" Date: Thu, 16 Aug 2018 11:24:10 +0800 Subject: [PATCH 02/10] add select api --- .../java/com/aliyun/oss/model/CSVFormat.java | 90 +++++++ .../com/aliyun/oss/model/CompressionType.java | 6 + .../CreateSelectObjectMetadataRequest.java | 54 ++++ .../aliyun/oss/model/InputSerialization.java | 37 +++ .../aliyun/oss/model/OutputSerialization.java | 79 ++++++ .../aliyun/oss/model/SelectInputStream.java | 178 +++++++++++++ .../oss/model/SelectObjectMetadata.java | 62 +++++ .../aliyun/oss/model/SelectObjectRequest.java | 239 ++++++++++++++++++ .../integrationtests/SelectObjectTest.java | 95 +++++++ 9 files changed, 840 insertions(+) create mode 100644 src/main/java/com/aliyun/oss/model/CSVFormat.java create mode 100644 src/main/java/com/aliyun/oss/model/CompressionType.java create mode 100644 src/main/java/com/aliyun/oss/model/CreateSelectObjectMetadataRequest.java create mode 100644 src/main/java/com/aliyun/oss/model/InputSerialization.java create mode 100644 src/main/java/com/aliyun/oss/model/OutputSerialization.java create mode 100644 src/main/java/com/aliyun/oss/model/SelectInputStream.java create mode 100644 src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java create mode 100644 src/main/java/com/aliyun/oss/model/SelectObjectRequest.java create mode 100644 src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java diff --git a/src/main/java/com/aliyun/oss/model/CSVFormat.java b/src/main/java/com/aliyun/oss/model/CSVFormat.java new file mode 100644 index 00000000..06304b7b --- /dev/null +++ b/src/main/java/com/aliyun/oss/model/CSVFormat.java @@ -0,0 +1,90 @@ +package com.aliyun.oss.model; + +import java.io.Serializable; + +public class CSVFormat implements Serializable { + public static enum Header { + None, // there is no csv header + Ignore, // we should ignore csv header and should not use csv header in select sql + Use // we can use csv header in select sql + } + //Define the first line of input. Valid values: None, Ignore, Use. + private Header headerInfo = Header.Ignore; + + //Define the delimiter for records + private String recordDelimiter = "\n"; + + //Define the comment char, this is a single char, so getter will return the first char + private String commentChar = "#"; + + //Define the delimiter for fields, this is a single char, so getter will return the first char + private String fieldDelimiter = ","; + + //Define the quote char, this is a single char, so getter will return the first char + private String quoteChar = "\""; + + public String getHeaderInfo() { + return headerInfo.name(); + } + + public void setHeaderInfo(Header headerInfo) { + this.headerInfo = headerInfo; + } + + public CSVFormat withHeaderInfo(Header headerInfo) { + setHeaderInfo(headerInfo); + return this; + } + + public String getRecordDelimiter() { + return recordDelimiter; + } + + public void setRecordDelimiter(String recordDelimiter) { + this.recordDelimiter = recordDelimiter; + } + + public CSVFormat withRecordDelimiter(String recordDelimiter) { + setRecordDelimiter(recordDelimiter); + return this; + } + + public Character getCommentChar() { + return commentChar == null || commentChar.isEmpty() ? null : commentChar.charAt(0); + } + + public void setCommentChar(String commentChar) { + this.commentChar = commentChar; + } + + public CSVFormat withCommentChar(String commentChar) { + setCommentChar(commentChar); + return this; + } + + public Character getFieldDelimiter() { + return fieldDelimiter == null || fieldDelimiter.isEmpty() ? null : fieldDelimiter.charAt(0); + } + + public void setFieldDelimiter(String fieldDelimiter) { + this.fieldDelimiter = fieldDelimiter; + } + + public CSVFormat withFieldDelimiter(String fieldDelimiter) { + setFieldDelimiter(fieldDelimiter); + return this; + } + + public Character getQuoteChar() { + return quoteChar == null || quoteChar.isEmpty() ? null : quoteChar.charAt(0); + } + + public void setQuoteChar(String quoteChar) { + this.quoteChar = quoteChar; + } + + public CSVFormat withQuoteChar(String quoteChar) { + setQuoteChar(quoteChar); + return this; + } +} diff --git a/src/main/java/com/aliyun/oss/model/CompressionType.java b/src/main/java/com/aliyun/oss/model/CompressionType.java new file mode 100644 index 00000000..d384873c --- /dev/null +++ b/src/main/java/com/aliyun/oss/model/CompressionType.java @@ -0,0 +1,6 @@ +package com.aliyun.oss.model; + +public enum CompressionType { + NONE, + GZIP +} diff --git a/src/main/java/com/aliyun/oss/model/CreateSelectObjectMetadataRequest.java b/src/main/java/com/aliyun/oss/model/CreateSelectObjectMetadataRequest.java new file mode 100644 index 00000000..fda0ab0b --- /dev/null +++ b/src/main/java/com/aliyun/oss/model/CreateSelectObjectMetadataRequest.java @@ -0,0 +1,54 @@ +package com.aliyun.oss.model; + +import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_CSV_META; + +public class CreateSelectObjectMetadataRequest extends HeadObjectRequest { + private String process; + private InputSerialization inputSerialization = new InputSerialization(); + private boolean overwrite; + + public CreateSelectObjectMetadataRequest(String bucketName, String key) { + super(bucketName, key); + setProcess(SUBRESOURCE_CSV_META); + setOverwrite(false); + } + + public String getProcess() { + return process; + } + + public void setProcess(String process) { + this.process = process; + } + + public CreateSelectObjectMetadataRequest withProcess(String process) { + setProcess(process); + return this; + } + + public InputSerialization getInputSerialization() { + return inputSerialization; + } + + public void setInputSerialization(InputSerialization inputSerialization) { + this.inputSerialization = inputSerialization; + } + + public CreateSelectObjectMetadataRequest withInputSerialization(InputSerialization inputSerialization) { + setInputSerialization(inputSerialization); + return this; + } + + public boolean isOverwrite() { + return overwrite; + } + + public void setOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } + + public CreateSelectObjectMetadataRequest withOverwrite(boolean overwrite) { + setOverwrite(overwrite); + return this; + } +} diff --git a/src/main/java/com/aliyun/oss/model/InputSerialization.java b/src/main/java/com/aliyun/oss/model/InputSerialization.java new file mode 100644 index 00000000..6208b3f2 --- /dev/null +++ b/src/main/java/com/aliyun/oss/model/InputSerialization.java @@ -0,0 +1,37 @@ +package com.aliyun.oss.model; + +import java.io.Serializable; + +/** + * Define input serialization of the select object operations. + */ +public class InputSerialization implements Serializable { + private CSVFormat csvInputFormat = new CSVFormat(); + private String compressionType = CompressionType.NONE.name(); + + public CSVFormat getCsvInputFormat() { + return csvInputFormat; + } + + public void setCsvInputFormat(CSVFormat csvInputFormat) { + this.csvInputFormat = csvInputFormat; + } + + public InputSerialization withCsvInputFormat(CSVFormat csvFormat) { + setCsvInputFormat(csvFormat); + return this; + } + + public String getCompressionType() { + return compressionType; + } + + public void setCompressionType(CompressionType compressionType) { + this.compressionType = compressionType.name(); + } + + public InputSerialization withCompressionType(CompressionType compressionType) { + setCompressionType(compressionType); + return this; + } +} diff --git a/src/main/java/com/aliyun/oss/model/OutputSerialization.java b/src/main/java/com/aliyun/oss/model/OutputSerialization.java new file mode 100644 index 00000000..50b04532 --- /dev/null +++ b/src/main/java/com/aliyun/oss/model/OutputSerialization.java @@ -0,0 +1,79 @@ +package com.aliyun.oss.model; + +import java.io.Serializable; + +/** + * Define how to output results of the select object operations. + */ +public class OutputSerialization implements Serializable { + private CSVFormat csvOutputFormat = new CSVFormat(); + private String compressionType = CompressionType.NONE.name(); + private boolean keepAllColumns = false; + private boolean crcEnabled = false; + private boolean outputRawData = false; + + public CSVFormat getCsvOutputFormat() { + return csvOutputFormat; + } + + public void setCsvOutputFormat(CSVFormat csvOutputFormat) { + this.csvOutputFormat = csvOutputFormat; + } + + public OutputSerialization withCsvOutputFormat(CSVFormat csvFormat) { + setCsvOutputFormat(csvFormat); + return this; + } + + public String getCompressionType() { + return compressionType; + } + + public void setCompressionType(CompressionType compressionType) { + this.compressionType = compressionType.name(); + } + + public OutputSerialization withCompressionType(CompressionType compressionType) { + setCompressionType(compressionType); + return this; + } + + public boolean isKeepAllColumns() { + return keepAllColumns; + } + + public void setKeepAllColumns(boolean keepAllColumns) { + this.keepAllColumns = keepAllColumns; + } + + public OutputSerialization withKeepAllColumns(boolean keepAllColumns) { + setKeepAllColumns(keepAllColumns); + return this; + } + + public boolean isCrcEnabled() { + return crcEnabled; + } + + public void setCrcEnabled(boolean crcEnabled) { + this.crcEnabled = crcEnabled; + } + + public OutputSerialization withCrcEnabled(boolean crcEnabled) { + setCrcEnabled(crcEnabled); + return this; + } + + public boolean isOutputRawData() { + return outputRawData; + } + + public void setOutputRawData(boolean outputRawData) { + this.outputRawData = outputRawData; + } + + public OutputSerialization withOutputRawData(boolean outputRawData) { + setOutputRawData(outputRawData); + return this; + } +} diff --git a/src/main/java/com/aliyun/oss/model/SelectInputStream.java b/src/main/java/com/aliyun/oss/model/SelectInputStream.java new file mode 100644 index 00000000..673a4dd3 --- /dev/null +++ b/src/main/java/com/aliyun/oss/model/SelectInputStream.java @@ -0,0 +1,178 @@ +package com.aliyun.oss.model; + +import com.aliyun.oss.event.ProgressEventType; +import com.aliyun.oss.event.ProgressListener; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import static com.aliyun.oss.event.ProgressPublisher.publishProgress; +import static com.aliyun.oss.event.ProgressPublisher.publishSelectProgress; + +public class SelectInputStream extends FilterInputStream { + /** + * Format of data frame + * |--frame type(4 bytes)--|--payload length(4 bytes)--|--header checksum(4 bytes)--| + * |--scanned data bytes(8 bytes)--|--payload--|--payload checksum(4 bytes)--| + */ + private static final int DATA_FRAME_MAGIC = 8388609; + + /** + * Format of continuous frame + * |--frame type(4 bytes)--|--payload length(4 bytes)--|--header checksum(4 bytes)--| + * |--scanned data bytes(8 bytes)--|--payload checksum(4 bytes)--| + */ + private static final int CONTINUOUS_FRAME_MAGIC = 8388612; + + /** + * Format of end frame + * |--frame type(4 bytes)--|--payload length(4 bytes)--|--header checksum(4 bytes)--| + * |--scanned data bytes(8 bytes)--|--total scan size(8 bytes)--| + * |--status code(4 bytes)--|--error message--|--payload checksum(4 bytes)--| + */ + private static final int END_FRAME_MAGIC = 8388613; + private static final int SELECT_VERSION = 1; + private static final long DEFAULT_NOTIFICATION_THRESHOLD = 50 * 1024 * 1024;//notify every scanned 50MB + + private long currentFrameOffset; + private long currentFramePayloadLength; + private byte[] currentFrameTypeBytes; + private byte[] currentFramePayloadLengthBytes; + private byte[] currentFrameHeaderChecksumBytes; + private byte[] scannedDataBytes; + private byte[] currentFramePayloadChecksumBytes; + private boolean finished; + private ProgressListener selectProgressListener; + private long nextNotificationScannedSize; + /** + * payload checksum is the last 4 bytes in one frame, we use this flag to indicate whether we + * need read the 4 bytes before we advance to next frame. + */ + private boolean firstReadFrame; + + public SelectInputStream(InputStream in, ProgressListener selectProgressListener) { + super(in); + currentFrameOffset = 0; + currentFramePayloadLength = 0; + currentFrameTypeBytes = new byte[4]; + currentFramePayloadLengthBytes = new byte[4]; + currentFrameHeaderChecksumBytes = new byte[4]; + scannedDataBytes = new byte[8]; + currentFramePayloadChecksumBytes = new byte[4]; + finished = false; + firstReadFrame = true; + this.selectProgressListener = selectProgressListener; + this.nextNotificationScannedSize = DEFAULT_NOTIFICATION_THRESHOLD; + } + + private void internalRead(byte[] buf, int off, int len) throws IOException { + int bytesRead = 0; + while (bytesRead < len) { + int bytes = in.read(buf, off + bytesRead, len - bytesRead); + if (bytes < 0) { + throw new IOException("invalid input stream end found, need another " + (len - bytesRead) + "bytes"); + } + bytesRead += bytes; + } + } + + private void readFrame() throws IOException { + while (currentFrameOffset >= currentFramePayloadLength && !finished) { + if (!firstReadFrame) { + internalRead(currentFramePayloadChecksumBytes, 0, 4); + } + firstReadFrame = false; + //advance to next frame + internalRead(currentFrameTypeBytes, 0, 4); + //first byte is version byte + if (currentFrameTypeBytes[0] != SELECT_VERSION) { + throw new IOException("invalid select version found: " + currentFrameTypeBytes[0] + ", expect: " + SELECT_VERSION); + } + internalRead(currentFramePayloadLengthBytes, 0, 4); + internalRead(currentFrameHeaderChecksumBytes, 0, 4); + internalRead(scannedDataBytes, 0, 8); + + currentFrameTypeBytes[0] = 0; + int type = ByteBuffer.wrap(currentFrameTypeBytes).getInt(); + switch (type) { + case DATA_FRAME_MAGIC: + currentFramePayloadLength = ByteBuffer.wrap(currentFramePayloadLengthBytes).getInt() - 8; + currentFrameOffset = 0; + break; + case CONTINUOUS_FRAME_MAGIC: + //just break, continue + break; + case END_FRAME_MAGIC: + currentFramePayloadLength = ByteBuffer.wrap(currentFramePayloadLengthBytes).getInt() - 8; + byte[] totalScannedDataSizeBytes = new byte[8]; + internalRead(totalScannedDataSizeBytes, 0, 8); + + byte[] statusBytes = new byte[4]; + internalRead(statusBytes, 0, 4); + + int status = ByteBuffer.wrap(statusBytes).getInt(); + int errorMessageSize = (int)(currentFramePayloadLength - 12); + if (errorMessageSize > 0) { + byte[] errorMessageBytes = new byte[errorMessageSize]; + internalRead(errorMessageBytes, 0, errorMessageSize); + String error = new String(errorMessageBytes); + if (status / 100 != 2) { + throw new IOException("Oss Select encounter error: code: " + status + ", message: " + error); + } + } + finished = true; + currentFramePayloadLength = currentFrameOffset; + internalRead(currentFramePayloadChecksumBytes, 0, 4); + break; + default: + throw new IOException("unsupported frame type found: " + type); + } + //notify select progress + ProgressEventType eventType = ProgressEventType.SELECT_SCAN_EVENT; + if (finished) { + eventType = ProgressEventType.SELECT_COMPLETED_EVENT; + } + long scannedDataSize = ByteBuffer.wrap(scannedDataBytes).getLong(); + if (scannedDataSize >= nextNotificationScannedSize || finished) { + publishSelectProgress(selectProgressListener, eventType, scannedDataSize); + nextNotificationScannedSize += DEFAULT_NOTIFICATION_THRESHOLD; + } + } + } + + @Override + public int read() throws IOException { + readFrame(); + int byteRead = in.read(); + if (byteRead >= 0) { + currentFrameOffset++; + } + return byteRead; + } + + @Override + public int read(byte b[]) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + readFrame(); + int bytesToRead = (int)Math.min(len, currentFramePayloadLength - currentFrameOffset); + if (bytesToRead != 0) { + int bytes = in.read(buf, off, bytesToRead); + if (bytes > 0) { + currentFrameOffset += bytes; + } + return bytes; + } + return -1; + } + + @Override + public int available() throws IOException { + throw new IOException("select object input stream does not support available() operation"); + } +} diff --git a/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java b/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java new file mode 100644 index 00000000..8e96bb1e --- /dev/null +++ b/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java @@ -0,0 +1,62 @@ +package com.aliyun.oss.model; + +/** + * Metadata for select object requests. + * For example, {@link CsvObjectMetadata} contains total lines so that + * users can do line-range query for select requests + */ +public class SelectObjectMetadata extends ObjectMetadata { + + private CsvObjectMetadata csvObjectMetadata; + + public SelectObjectMetadata() {} + + public SelectObjectMetadata(ObjectMetadata objectMetadata) { + setUserMetadata(objectMetadata.getUserMetadata()); + metadata.putAll(objectMetadata.getRawMetadata()); + } + + public CsvObjectMetadata getCsvObjectMetadata() { + return csvObjectMetadata; + } + + public void setCsvObjectMetadata(CsvObjectMetadata csvObjectMetadata) { + this.csvObjectMetadata = csvObjectMetadata; + } + + public SelectObjectMetadata withCsvObjectMetadata(CsvObjectMetadata csvObjectMetadata) { + setCsvObjectMetadata(csvObjectMetadata); + return this; + } + + public static class CsvObjectMetadata { + private int totalLines; + private int splits; + + public int getTotalLines() { + return totalLines; + } + + public void setTotalLines(int totalLines) { + this.totalLines = totalLines; + } + + public CsvObjectMetadata withTotalLines(int totalLines) { + setTotalLines(totalLines); + return this; + } + + public int getSplits() { + return splits; + } + + public void setSplits(int splits) { + this.splits = splits; + } + + public CsvObjectMetadata withSplits(int splits) { + setSplits(splits); + return this; + } + } +} diff --git a/src/main/java/com/aliyun/oss/model/SelectObjectRequest.java b/src/main/java/com/aliyun/oss/model/SelectObjectRequest.java new file mode 100644 index 00000000..eb8120c7 --- /dev/null +++ b/src/main/java/com/aliyun/oss/model/SelectObjectRequest.java @@ -0,0 +1,239 @@ +package com.aliyun.oss.model; + +import com.aliyun.oss.common.utils.RangeSpec; +import com.aliyun.oss.event.ProgressListener; + +import java.security.InvalidParameterException; + +import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_CSV_SELECT; + +/** + * This is the request class that is used to select an object from OSS. It + * wraps all the information needed to select this object. + * User can pass sql expression to filter csv objects. + */ +public class SelectObjectRequest extends GetObjectRequest { + private static final String LINE_RANGE_PREFIX = "line-range="; + private static final String SPLIT_RANGE_PREFIX = "split-range="; + + public enum ExpressionType { + SQL, + } + + private String expression; + private ExpressionType expressionType = ExpressionType.SQL; + private InputSerialization inputSerialization = new InputSerialization(); + private OutputSerialization outputSerialization = new OutputSerialization(); + /** + * scanned bytes progress listener for select request, + * it is different from progressListener from {@link WebServiceRequest} which used for request and response bytes + */ + private ProgressListener selectProgressListener; + + //lineRange is not a generic requirement, we will move it to somewhere else later. + private long[] lineRange; + + //splitRange is a range of splits, one split is a collection of continuous lines + private long[] splitRange; + + public SelectObjectRequest(String bucketName, String key) { + super(bucketName, key); + setProcess(SUBRESOURCE_CSV_SELECT); + } + + public long[] getLineRange() { + return lineRange; + } + + /** + * For text file, we can define line range for select operations. + * Select will only scan data between startLine and endLine, that is [startLine, endLine] + * + * * @param startLine + *

+ * Start line number + *

+ *

+ * When the start is non-negative, it means the starting line + * to select. When the start is -1, it means the range is + * determined by the end only and the end could not be -1. For + * example, when start is -1 and end is 100. It means the + * select line range will be the last 100 lines. + *

+ * @param endLine + *

+ * End line number + *

+ *

+ * When the end is non-negative, it means the ending line to + * select. When the end is -1, it means the range is determined + * by the start only and the start could not be -1. For example, + * when end is -1 and start is 100. It means the select range + * will be all exception first 100 lines. + *

+ */ + public void setLineRange(long startLine, long endLine) { + lineRange = new long[] {startLine, endLine}; + } + + public SelectObjectRequest withLineRange(long startLine, long endLine) { + setLineRange(startLine, endLine); + return this; + } + + public long[] getSplitRange() { + return splitRange; + } + + /** + * For text file, we can define split range for select operations. + * Select will only scan data between startSplit and endSplit, that is [startSplit, endSplit] + * + * @param startSplit + *

+ * Start split number + *

+ *

+ * When the start is non-negative, it means the starting split + * to select. When the start is -1, it means the range is + * determined by the end only and the end could not be -1. For + * example, when start is -1 and end is 100. It means the + * select split range will be the last 100 splits. + *

+ * + * @param endSplit + *

+ * End split number + *

+ *

+ * When the end is non-negative, it means the ending split to + * select. When the end is -1, it means the range is determined + * by the start only and the start could not be -1. For example, + * when end is -1 and start is 100. It means the select range + * will be all exception first 100 splits. + *

+ */ + public void setSplitRange(long startSplit, long endSplit) { + splitRange = new long[] {startSplit, endSplit}; + } + + public SelectObjectRequest withSplitRange(long startSplit, long endSplit) { + setSplitRange(startSplit, endSplit); + return this; + } + + @Override + public long[] getRange() { + return null; + } + + @Override + public void setRange(long start, long end) throws InvalidParameterException { + throw new InvalidParameterException("Select object does not support byte range now."); + } + + public String lineRangeToString(long[] range) { + return rangeToString(LINE_RANGE_PREFIX, range); + } + + public String splitRangeToString(long[] range) { + return rangeToString(SPLIT_RANGE_PREFIX, range); + } + + public String rangeToString(String rangePrefix, long[] range) { + RangeSpec rangeSpec = RangeSpec.parse(range); + switch (rangeSpec.getType()) { + case NORMAL_RANGE: + return String.format("%s%d-%d", rangePrefix, rangeSpec.getStart(), rangeSpec.getEnd()); + case START_TO: + return String.format("%s%d-", rangePrefix, rangeSpec.getStart()); + case TO_END: + return String.format("%s-%d", rangePrefix, rangeSpec.getEnd()); + } + + return null; + } + + /** + * Get the expression which used to filter objects + * @return + */ + public String getExpression() { + return expression; + } + + /** + * Set the expression which used to filter objects + * @param expression + */ + public void setExpression(String expression) { + this.expression = expression; + } + + public SelectObjectRequest withExpression(String expression) { + setExpression(expression); + return this; + } + + /** + * Get the expression type, we only support SQL now. + * @return + */ + public ExpressionType getExpressionType() { + return expressionType; + } + + /** + * Get the input serialization, we use this to parse data + * @return + */ + public InputSerialization getInputSerialization() { + return inputSerialization; + } + + /** + * Set the input serialization, we use this to parse data + * @param inputSerialization + */ + public void setInputSerialization(InputSerialization inputSerialization) { + this.inputSerialization = inputSerialization; + } + + public SelectObjectRequest withInputSerialization(InputSerialization inputSerialization) { + setInputSerialization(inputSerialization); + return this; + } + /** + * Get the output serialization, it defines the output format + * @return + */ + public OutputSerialization getOutputSerialization() { + return outputSerialization; + } + + /** + * Set the output serialization, it defines the output format + * @param outputSerialization + */ + public void setOutputSerialization(OutputSerialization outputSerialization) { + this.outputSerialization = outputSerialization; + } + + public SelectObjectRequest withOutputSerialization(OutputSerialization outputSerialization) { + setOutputSerialization(outputSerialization); + return this; + } + + public ProgressListener getSelectProgressListener() { + return selectProgressListener; + } + + public void setSelectProgressListener(ProgressListener selectProgressListener) { + this.selectProgressListener = selectProgressListener; + } + + public SelectObjectRequest withSelectProgressListener(ProgressListener selectProgressListener) { + setSelectProgressListener(selectProgressListener); + return this; + } +} diff --git a/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java b/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java new file mode 100644 index 00000000..4e872e10 --- /dev/null +++ b/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java @@ -0,0 +1,95 @@ +package com.aliyun.oss.integrationtests; + +import com.aliyun.oss.model.*; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +public class SelectObjectTest extends TestBase { + + @Test + public void testGetSelectObjectMetadata() { + final String valid = "get-select-object-metadata-valid"; + final String validContent = "name,school,company,age\n" + + "Lora Francis,School,Staples Inc,27\n" + + "Eleanor Little,School,\"Conectiv, Inc\",43\n" + + "Rosie Hughes,School,Western Gas Resources Inc,44\n" + + "Lawrence Ross,School,MetLife Inc.,24"; + ossClient.putObject(bucketName, valid, new ByteArrayInputStream(validContent.getBytes())); + SelectObjectMetadata validSelectObjectMetadata = ossClient.createSelectObjectMetadata( + new CreateSelectObjectMetadataRequest(bucketName, valid) + .withInputSerialization(new InputSerialization().withCsvInputFormat(new CSVFormat()))); + Assert.assertEquals(5, validSelectObjectMetadata.getCsvObjectMetadata().getTotalLines()); + Assert.assertEquals(1, validSelectObjectMetadata.getCsvObjectMetadata().getSplits()); + + final String invalid = "get-select-object-metadata-invalid"; + final String invalidContent = "name,school,company,age\n" + + "Laura Rodriquez,School,Triad Hospitals Inc,39\n" + + "\",,,44\n" + + "Nora Cannon,School,Reader's Digest Association Inc.,30\n" + + "Louisa Weaver,School,Trinity Industries Inc,21\n" + + "Howard Hart,School,\"EOG Resources, Inc.\",35\n" + + "\"Ola \"\"\"\"Miller\",School,Trump Hotels & Casino Resorts Inc.,20"; + ossClient.putObject(bucketName, invalid, new ByteArrayInputStream(invalidContent.getBytes())); + try { + ossClient.createSelectObjectMetadata( + new CreateSelectObjectMetadataRequest(bucketName, invalid) + .withInputSerialization(new InputSerialization().withCsvInputFormat(new CSVFormat()))); + Assert.fail("invalid object for get select object metadata"); + } catch (Exception e) { + } + } + + @Test + public void testSelectObject() throws IOException { + final String key = "get-select-object-metadata-valid"; + final String content = "name,school,company,age\n" + + "Lora Francis,School,Staples Inc,27\n" + + "Eleanor Little,School,\"Conectiv, Inc\",43\n" + + "Rosie Hughes,School,Western Gas Resources Inc,44\n" + + "Lawrence Ross,School,MetLife Inc.,24\n"; + ossClient.putObject(bucketName, key, new ByteArrayInputStream(content.getBytes())); + + SelectObjectRequest selectObjectRequest = + new SelectObjectRequest(bucketName, key) + .withInputSerialization(new InputSerialization().withCsvInputFormat(new CSVFormat())) + .withOutputSerialization(new OutputSerialization().withCsvOutputFormat(new CSVFormat())); + selectObjectRequest.setExpression("select * from ossobject"); + OSSObject ossObject = ossClient.selectObject(selectObjectRequest); + byte[] buffer = new byte[1024]; + int bytesRead; + int off = 0; + while ((bytesRead = ossObject.getObjectContent().read(buffer, off, 1024 - off)) != -1) { + off += bytesRead; + } + + Assert.assertEquals(new String(buffer, 0, off), content.substring(content.indexOf("\n") + 1)); + + selectObjectRequest.setLineRange(1, 3); + OSSObject rangeOssObject = ossClient.selectObject(selectObjectRequest); + off = 0; + while ((bytesRead = rangeOssObject.getObjectContent().read(buffer, off, 1024 - off)) != -1) { + off += bytesRead; + } + Assert.assertEquals(new String(buffer, 0, off), + "Lora Francis,School,Staples Inc,27\n" + + "Eleanor Little,School,\"Conectiv, Inc\",43\n" + + "Rosie Hughes,School,Western Gas Resources Inc,44\n"); + + selectObjectRequest.setLineRange(5, 10); + try { + ossClient.selectObject(selectObjectRequest); + Assert.fail("invalid line range for select object request"); + } catch (Exception e) { + } + + selectObjectRequest.setSplitRange(5, 10); + try { + ossClient.selectObject(selectObjectRequest); + Assert.fail("both split range and line range have been set for select object request"); + } catch (Exception e) { + } + } +} From 9e98be53caaf24ebe7f269cb799eede116f4c842 Mon Sep 17 00:00:00 2001 From: "jinhu.wjh" Date: Sun, 19 Aug 2018 15:48:07 +0800 Subject: [PATCH 03/10] add select sdk --- .../oss/common/parser/RequestMarshallers.java | 7 ++- .../com/aliyun/oss/internal/OSSHeaders.java | 1 + .../oss/internal/OSSObjectOperation.java | 6 ++- .../aliyun/oss/model/OutputSerialization.java | 26 +++++++--- .../aliyun/oss/model/SelectInputStream.java | 50 ++++++++++++++++--- .../aliyun/oss/model/SelectObjectRequest.java | 14 ++++++ 6 files changed, 88 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java b/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java index dd88628c..1e9bb561 100644 --- a/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java +++ b/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java @@ -513,6 +513,9 @@ public byte[] marshall(SelectObjectRequest request) { xmlBody.append(""); xmlBody.append("" + BinaryUtil.toBase64String(request.getExpression().getBytes()) + ""); + xmlBody.append(""); + xmlBody.append("" + request.isSkipPartialDataRecord() + ""); + xmlBody.append(""); InputSerialization inputSerialization = request.getInputSerialization(); CSVFormat csvInputFormat = inputSerialization.getCsvInputFormat(); xmlBody.append(""); @@ -539,9 +542,11 @@ public byte[] marshall(SelectObjectRequest request) { xmlBody.append("" + BinaryUtil.toBase64String(csvOutputFormat.getRecordDelimiter().getBytes()) + ""); xmlBody.append("" + BinaryUtil.toBase64String(csvOutputFormat.getFieldDelimiter().toString().getBytes()) + ""); xmlBody.append("" + BinaryUtil.toBase64String(csvOutputFormat.getQuoteChar().toString().getBytes()) + ""); - xmlBody.append("" + outputSerialization.isKeepAllColumns() + ""); xmlBody.append(""); + xmlBody.append("" + outputSerialization.isKeepAllColumns() + ""); xmlBody.append("" + outputSerialization.isOutputRawData() + ""); + xmlBody.append("" + outputSerialization.isOutputHeader() + ""); + xmlBody.append("" + outputSerialization.isPayloadCrcEnabled() + ""); xmlBody.append(""); xmlBody.append(""); diff --git a/src/main/java/com/aliyun/oss/internal/OSSHeaders.java b/src/main/java/com/aliyun/oss/internal/OSSHeaders.java index 768ac646..6700d09d 100644 --- a/src/main/java/com/aliyun/oss/internal/OSSHeaders.java +++ b/src/main/java/com/aliyun/oss/internal/OSSHeaders.java @@ -83,6 +83,7 @@ public interface OSSHeaders extends HttpHeaders { static final String OSS_SELECT_PREFIX = "x-oss-select"; static final String OSS_SELECT_CSV_ROWS = OSS_SELECT_PREFIX + "-csv-rows"; + static final String OSS_SELECT_OUTPUT_RAW = OSS_SELECT_PREFIX + "-output-raw"; static final String OSS_SELECT_CSV_SPLITS = OSS_SELECT_PREFIX + "-csv-splits"; static final String OSS_SELECT_INPUT_LINE_RANGE = OSS_SELECT_PREFIX + "-line-range"; static final String OSS_SELECT_INPUT_SPLIT_RANGE = OSS_SELECT_PREFIX + "-split-range"; diff --git a/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java b/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java index 6669e4f0..8bee1a7a 100644 --- a/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java +++ b/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java @@ -33,6 +33,7 @@ import static com.aliyun.oss.internal.OSSConstants.DEFAULT_CHARSET_NAME; import static com.aliyun.oss.internal.OSSHeaders.OSS_SELECT_CSV_ROWS; import static com.aliyun.oss.internal.OSSHeaders.OSS_SELECT_CSV_SPLITS; +import static com.aliyun.oss.internal.OSSHeaders.OSS_SELECT_OUTPUT_RAW; import static com.aliyun.oss.internal.OSSUtils.OSS_RESOURCE_MANAGER; import static com.aliyun.oss.internal.OSSUtils.addDateHeader; import static com.aliyun.oss.internal.OSSUtils.addHeader; @@ -271,8 +272,9 @@ public OSSObject selectObject(SelectObjectRequest selectObjectRequest) throws OS OSSObject ossObject = doOperation(request, new GetObjectResponseParser(bucketName, key), bucketName, key, true); publishProgress(selectProgressListener, ProgressEventType.SELECT_STARTED_EVENT); InputStream inputStream = ossObject.getObjectContent(); - if (!selectObjectRequest.getOutputSerialization().isOutputRawData()) { - ossObject.setObjectContent(new SelectInputStream(inputStream, selectProgressListener)); + if (!Boolean.parseBoolean(ossObject.getObjectMetadata().getRawMetadata().get(OSS_SELECT_OUTPUT_RAW).toString())) { + ossObject.setObjectContent(new SelectInputStream(inputStream, selectProgressListener, + selectObjectRequest.getOutputSerialization().isPayloadCrcEnabled())); } return ossObject; } catch (RuntimeException e) { diff --git a/src/main/java/com/aliyun/oss/model/OutputSerialization.java b/src/main/java/com/aliyun/oss/model/OutputSerialization.java index 50b04532..73e6f604 100644 --- a/src/main/java/com/aliyun/oss/model/OutputSerialization.java +++ b/src/main/java/com/aliyun/oss/model/OutputSerialization.java @@ -9,8 +9,9 @@ public class OutputSerialization implements Serializable { private CSVFormat csvOutputFormat = new CSVFormat(); private String compressionType = CompressionType.NONE.name(); private boolean keepAllColumns = false; - private boolean crcEnabled = false; + private boolean payloadCrcEnabled = false; private boolean outputRawData = false; + private boolean outputHeader = false; public CSVFormat getCsvOutputFormat() { return csvOutputFormat; @@ -51,16 +52,16 @@ public OutputSerialization withKeepAllColumns(boolean keepAllColumns) { return this; } - public boolean isCrcEnabled() { - return crcEnabled; + public boolean isPayloadCrcEnabled() { + return payloadCrcEnabled; } - public void setCrcEnabled(boolean crcEnabled) { - this.crcEnabled = crcEnabled; + public void setPayloadCrcEnabled(boolean payloadCrcEnabled) { + this.payloadCrcEnabled = payloadCrcEnabled; } public OutputSerialization withCrcEnabled(boolean crcEnabled) { - setCrcEnabled(crcEnabled); + setPayloadCrcEnabled(crcEnabled); return this; } @@ -76,4 +77,17 @@ public OutputSerialization withOutputRawData(boolean outputRawData) { setOutputRawData(outputRawData); return this; } + + public boolean isOutputHeader() { + return outputHeader; + } + + public void setOutputHeader(boolean outputHeader) { + this.outputHeader = outputHeader; + } + + public OutputSerialization withOutputHeader(boolean outputHeader) { + setOutputHeader(outputHeader); + return this; + } } diff --git a/src/main/java/com/aliyun/oss/model/SelectInputStream.java b/src/main/java/com/aliyun/oss/model/SelectInputStream.java index 673a4dd3..c7715818 100644 --- a/src/main/java/com/aliyun/oss/model/SelectInputStream.java +++ b/src/main/java/com/aliyun/oss/model/SelectInputStream.java @@ -7,8 +7,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.zip.CRC32; -import static com.aliyun.oss.event.ProgressPublisher.publishProgress; import static com.aliyun.oss.event.ProgressPublisher.publishSelectProgress; public class SelectInputStream extends FilterInputStream { @@ -46,13 +46,15 @@ public class SelectInputStream extends FilterInputStream { private boolean finished; private ProgressListener selectProgressListener; private long nextNotificationScannedSize; + private boolean payloadCrcEnabled; + private CRC32 crc32; /** * payload checksum is the last 4 bytes in one frame, we use this flag to indicate whether we * need read the 4 bytes before we advance to next frame. */ private boolean firstReadFrame; - public SelectInputStream(InputStream in, ProgressListener selectProgressListener) { + public SelectInputStream(InputStream in, ProgressListener selectProgressListener, boolean payloadCrcEnabled) { super(in); currentFrameOffset = 0; currentFramePayloadLength = 0; @@ -65,6 +67,11 @@ public SelectInputStream(InputStream in, ProgressListener selectProgressListener firstReadFrame = true; this.selectProgressListener = selectProgressListener; this.nextNotificationScannedSize = DEFAULT_NOTIFICATION_THRESHOLD; + this.payloadCrcEnabled = payloadCrcEnabled; + if (this.payloadCrcEnabled) { + this.crc32 = new CRC32(); + this.crc32.reset(); + } } private void internalRead(byte[] buf, int off, int len) throws IOException { @@ -78,10 +85,22 @@ private void internalRead(byte[] buf, int off, int len) throws IOException { } } + private void validateCheckSum(byte[] checksumBytes, CRC32 crc32) throws IOException { + if (payloadCrcEnabled) { + int currentChecksum = ByteBuffer.wrap(checksumBytes).getInt(); + if (crc32.getValue() != ((long)currentChecksum & 0xffffffffL)) { + throw new IOException("select frame crc check failed, actual: " + crc32.getValue() + + ", expect: " + currentChecksum); + } + } + } + private void readFrame() throws IOException { while (currentFrameOffset >= currentFramePayloadLength && !finished) { if (!firstReadFrame) { internalRead(currentFramePayloadChecksumBytes, 0, 4); + validateCheckSum(currentFramePayloadChecksumBytes, crc32); + crc32.reset(); } firstReadFrame = false; //advance to next frame @@ -93,6 +112,9 @@ private void readFrame() throws IOException { internalRead(currentFramePayloadLengthBytes, 0, 4); internalRead(currentFrameHeaderChecksumBytes, 0, 4); internalRead(scannedDataBytes, 0, 8); + if (payloadCrcEnabled) { + crc32.update(scannedDataBytes); + } currentFrameTypeBytes[0] = 0; int type = ByteBuffer.wrap(currentFrameTypeBytes).getInt(); @@ -108,23 +130,31 @@ private void readFrame() throws IOException { currentFramePayloadLength = ByteBuffer.wrap(currentFramePayloadLengthBytes).getInt() - 8; byte[] totalScannedDataSizeBytes = new byte[8]; internalRead(totalScannedDataSizeBytes, 0, 8); - byte[] statusBytes = new byte[4]; internalRead(statusBytes, 0, 4); - + if (payloadCrcEnabled) { + crc32.update(totalScannedDataSizeBytes); + crc32.update(statusBytes); + } int status = ByteBuffer.wrap(statusBytes).getInt(); int errorMessageSize = (int)(currentFramePayloadLength - 12); + String error = ""; if (errorMessageSize > 0) { byte[] errorMessageBytes = new byte[errorMessageSize]; internalRead(errorMessageBytes, 0, errorMessageSize); - String error = new String(errorMessageBytes); - if (status / 100 != 2) { - throw new IOException("Oss Select encounter error: code: " + status + ", message: " + error); + error = new String(errorMessageBytes); + if (payloadCrcEnabled) { + crc32.update(errorMessageBytes); } } finished = true; currentFramePayloadLength = currentFrameOffset; internalRead(currentFramePayloadChecksumBytes, 0, 4); + + validateCheckSum(currentFramePayloadChecksumBytes, crc32); + if (status / 100 != 2) { + throw new IOException("Oss Select encounter error: code: " + status + ", message: " + error); + } break; default: throw new IOException("unsupported frame type found: " + type); @@ -148,6 +178,9 @@ public int read() throws IOException { int byteRead = in.read(); if (byteRead >= 0) { currentFrameOffset++; + if (payloadCrcEnabled) { + crc32.update(byteRead); + } } return byteRead; } @@ -165,6 +198,9 @@ public int read(byte[] buf, int off, int len) throws IOException { int bytes = in.read(buf, off, bytesToRead); if (bytes > 0) { currentFrameOffset += bytes; + if (payloadCrcEnabled) { + crc32.update(buf, off, bytes); + } } return bytes; } diff --git a/src/main/java/com/aliyun/oss/model/SelectObjectRequest.java b/src/main/java/com/aliyun/oss/model/SelectObjectRequest.java index eb8120c7..a61935e6 100644 --- a/src/main/java/com/aliyun/oss/model/SelectObjectRequest.java +++ b/src/main/java/com/aliyun/oss/model/SelectObjectRequest.java @@ -21,6 +21,7 @@ public enum ExpressionType { } private String expression; + private boolean skipPartialDataRecord = false; private ExpressionType expressionType = ExpressionType.SQL; private InputSerialization inputSerialization = new InputSerialization(); private OutputSerialization outputSerialization = new OutputSerialization(); @@ -175,6 +176,19 @@ public SelectObjectRequest withExpression(String expression) { return this; } + public boolean isSkipPartialDataRecord() { + return skipPartialDataRecord; + } + + public void setSkipPartialDataRecord(boolean skipPartialDataRecord) { + this.skipPartialDataRecord = skipPartialDataRecord; + } + + public SelectObjectRequest withSkipPartialDataRecord(boolean skipPartialDataRecord) { + setSkipPartialDataRecord(skipPartialDataRecord); + return this; + } + /** * Get the expression type, we only support SQL now. * @return From 76bea79ea95b44a9c15062bb1eeda6e87585b01f Mon Sep 17 00:00:00 2001 From: "jinhu.wjh" Date: Tue, 21 Aug 2018 11:31:04 +0800 Subject: [PATCH 04/10] add select sdk --- src/main/java/com/aliyun/oss/model/SelectInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/aliyun/oss/model/SelectInputStream.java b/src/main/java/com/aliyun/oss/model/SelectInputStream.java index c7715818..a913456b 100644 --- a/src/main/java/com/aliyun/oss/model/SelectInputStream.java +++ b/src/main/java/com/aliyun/oss/model/SelectInputStream.java @@ -92,6 +92,7 @@ private void validateCheckSum(byte[] checksumBytes, CRC32 crc32) throws IOExcept throw new IOException("select frame crc check failed, actual: " + crc32.getValue() + ", expect: " + currentChecksum); } + crc32.reset(); } } @@ -100,7 +101,6 @@ private void readFrame() throws IOException { if (!firstReadFrame) { internalRead(currentFramePayloadChecksumBytes, 0, 4); validateCheckSum(currentFramePayloadChecksumBytes, crc32); - crc32.reset(); } firstReadFrame = false; //advance to next frame From f26751a301e7885013a7c9c2dcd1c1c52ff2f1d6 Mon Sep 17 00:00:00 2001 From: "jinhu.wjh" Date: Sat, 25 Aug 2018 21:02:51 +0800 Subject: [PATCH 05/10] change header default to none --- src/main/java/com/aliyun/oss/model/CSVFormat.java | 2 +- .../com/aliyun/oss/integrationtests/SelectObjectTest.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/aliyun/oss/model/CSVFormat.java b/src/main/java/com/aliyun/oss/model/CSVFormat.java index 06304b7b..d0f5aa49 100644 --- a/src/main/java/com/aliyun/oss/model/CSVFormat.java +++ b/src/main/java/com/aliyun/oss/model/CSVFormat.java @@ -9,7 +9,7 @@ public static enum Header { Use // we can use csv header in select sql } //Define the first line of input. Valid values: None, Ignore, Use. - private Header headerInfo = Header.Ignore; + private Header headerInfo = Header.None; //Define the delimiter for records private String recordDelimiter = "\n"; diff --git a/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java b/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java index 4e872e10..4c8722c4 100644 --- a/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java +++ b/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java @@ -54,7 +54,8 @@ public void testSelectObject() throws IOException { SelectObjectRequest selectObjectRequest = new SelectObjectRequest(bucketName, key) - .withInputSerialization(new InputSerialization().withCsvInputFormat(new CSVFormat())) + .withInputSerialization(new InputSerialization().withCsvInputFormat( + new CSVFormat().withHeaderInfo(CSVFormat.Header.Ignore))) .withOutputSerialization(new OutputSerialization().withCsvOutputFormat(new CSVFormat())); selectObjectRequest.setExpression("select * from ossobject"); OSSObject ossObject = ossClient.selectObject(selectObjectRequest); @@ -67,6 +68,10 @@ public void testSelectObject() throws IOException { Assert.assertEquals(new String(buffer, 0, off), content.substring(content.indexOf("\n") + 1)); + ossClient.createSelectObjectMetadata( + new CreateSelectObjectMetadataRequest(bucketName, key) + .withInputSerialization(new InputSerialization().withCsvInputFormat(new CSVFormat()))); + selectObjectRequest.setLineRange(1, 3); OSSObject rangeOssObject = ossClient.selectObject(selectObjectRequest); off = 0; From 84d375b76fdb859cb466c7e7fb1b8dc329c06bbf Mon Sep 17 00:00:00 2001 From: "jinhu.wjh" Date: Wed, 29 Aug 2018 11:47:33 +0800 Subject: [PATCH 06/10] add create select meta frame --- .../oss/internal/OSSObjectOperation.java | 26 ++- .../model/CreateSelectMetaInputStream.java | 183 ++++++++++++++++++ .../CreateSelectObjectMetadataRequest.java | 16 ++ .../oss/model/SelectObjectMetadata.java | 8 +- 4 files changed, 222 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java diff --git a/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java b/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java index 8bee1a7a..20c18a1d 100644 --- a/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java +++ b/src/main/java/com/aliyun/oss/internal/OSSObjectOperation.java @@ -218,13 +218,25 @@ public SelectObjectMetadata createSelectObjectMetadata(CreateSelectObjectMetadat .setBucket(bucketName).setKey(key).setOriginalRequest(genericRequest) .build(); - ObjectMetadata objectMetadata = doOperation(request, getObjectMetadataResponseParser, bucketName, key, true, null, null); - SelectObjectMetadata selectObjectMetadata = new SelectObjectMetadata(objectMetadata); - selectObjectMetadata.setCsvObjectMetadata( - new SelectObjectMetadata.CsvObjectMetadata() - .withTotalLines(Integer.parseInt(objectMetadata.getRawMetadata().get(OSS_SELECT_CSV_ROWS).toString())) - .withSplits(Integer.parseInt(objectMetadata.getRawMetadata().get(OSS_SELECT_CSV_SPLITS).toString()))); - return selectObjectMetadata; + //create meta progress listener(scanned bytes) + final ProgressListener selectProgressListener = createSelectObjectMetadataRequest.getSelectProgressListener(); + try { + OSSObject ossObject = doOperation(request, new GetObjectResponseParser(bucketName, key), bucketName, key, true); + publishProgress(selectProgressListener, ProgressEventType.SELECT_STARTED_EVENT); + SelectObjectMetadata selectObjectMetadata = new SelectObjectMetadata(ossObject.getObjectMetadata()); + InputStream in = ossObject.getObjectContent(); + CreateSelectMetaInputStream warppedStream = new CreateSelectMetaInputStream(in, selectObjectMetadata, selectProgressListener); + while (warppedStream.read() != -1) { + //read until eof + } + return selectObjectMetadata; + } catch (IOException e) { + publishProgress(selectProgressListener, ProgressEventType.SELECT_FAILED_EVENT); + throw new RuntimeException(e); + } catch (RuntimeException e) { + publishProgress(selectProgressListener, ProgressEventType.SELECT_FAILED_EVENT); + throw e; + } } /** diff --git a/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java b/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java new file mode 100644 index 00000000..a3fc05e5 --- /dev/null +++ b/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java @@ -0,0 +1,183 @@ +package com.aliyun.oss.model; + +import com.aliyun.oss.event.ProgressEventType; +import com.aliyun.oss.event.ProgressListener; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.zip.CRC32; + +import static com.aliyun.oss.event.ProgressPublisher.publishSelectProgress; + +public class CreateSelectMetaInputStream extends FilterInputStream { + /** + * Format of continuous frame + * |--frame type(4 bytes)--|--payload length(4 bytes)--|--header checksum(4 bytes)--| + * |--scanned data bytes(8 bytes)--|--payload checksum(4 bytes)--| + */ + private static final int CONTINUOUS_FRAME_MAGIC = 8388612; + + /** + * Format of end frame + * |--frame type(4 bytes)--|--payload length(4 bytes)--|--header checksum(4 bytes)--| + * |--scanned data bytes(8 bytes)--|--total scan size(8 bytes)--| + * |--status code(4 bytes)--|--total splits count(4 bytes)--| + * |--total lines(8 bytes)--|--columns count(4bytes)--|--payload checksum(4 bytes)--| + */ + private static final int END_FRAME_MAGIC = 8388614; + private static final int SELECT_VERSION = 1; + private static final long DEFAULT_NOTIFICATION_THRESHOLD = 50 * 1024 * 1024;//notify every scanned 50MB + + private long currentFrameOffset; + private long currentFramePayloadLength; + private byte[] currentFrameTypeBytes; + private byte[] currentFramePayloadLengthBytes; + private byte[] currentFrameHeaderChecksumBytes; + private byte[] scannedDataBytes; + private byte[] currentFramePayloadChecksumBytes; + private boolean finished; + private ProgressListener selectProgressListener; + private long nextNotificationScannedSize; + private CRC32 crc32; + private SelectObjectMetadata selectObjectMetadata; + /** + * payload checksum is the last 4 bytes in one frame, we use this flag to indicate whether we + * need read the 4 bytes before we advance to next frame. + */ + private boolean firstReadFrame; + + public CreateSelectMetaInputStream(InputStream in, SelectObjectMetadata selectObjectMetadata, ProgressListener selectProgressListener) { + super(in); + currentFrameOffset = 0; + currentFramePayloadLength = 0; + currentFrameTypeBytes = new byte[4]; + currentFramePayloadLengthBytes = new byte[4]; + currentFrameHeaderChecksumBytes = new byte[4]; + scannedDataBytes = new byte[8]; + currentFramePayloadChecksumBytes = new byte[4]; + finished = false; + firstReadFrame = true; + this.selectObjectMetadata = selectObjectMetadata; + this.selectProgressListener = selectProgressListener; + this.nextNotificationScannedSize = DEFAULT_NOTIFICATION_THRESHOLD; + this.crc32 = new CRC32(); + this.crc32.reset(); + } + + private void internalRead(byte[] buf, int off, int len) throws IOException { + int bytesRead = 0; + while (bytesRead < len) { + int bytes = in.read(buf, off + bytesRead, len - bytesRead); + if (bytes < 0) { + throw new IOException("invalid input stream end found, need another " + (len - bytesRead) + "bytes"); + } + bytesRead += bytes; + } + } + + private void validateCheckSum(byte[] checksumBytes, CRC32 crc32) throws IOException { + int currentChecksum = ByteBuffer.wrap(checksumBytes).getInt(); + if (crc32.getValue() != ((long)currentChecksum & 0xffffffffL)) { + throw new IOException("select frame crc check failed, actual: " + crc32.getValue() + + ", expect: " + currentChecksum); + } + crc32.reset(); + } + + private void readFrame() throws IOException { + while (currentFrameOffset >= currentFramePayloadLength && !finished) { + if (!firstReadFrame) { + internalRead(currentFramePayloadChecksumBytes, 0, 4); + validateCheckSum(currentFramePayloadChecksumBytes, crc32); + } + firstReadFrame = false; + //advance to next frame + internalRead(currentFrameTypeBytes, 0, 4); + //first byte is version byte + if (currentFrameTypeBytes[0] != SELECT_VERSION) { + throw new IOException("invalid select version found: " + currentFrameTypeBytes[0] + ", expect: " + SELECT_VERSION); + } + internalRead(currentFramePayloadLengthBytes, 0, 4); + internalRead(currentFrameHeaderChecksumBytes, 0, 4); + internalRead(scannedDataBytes, 0, 8); + crc32.update(scannedDataBytes); + + currentFrameTypeBytes[0] = 0; + int type = ByteBuffer.wrap(currentFrameTypeBytes).getInt(); + switch (type) { + case CONTINUOUS_FRAME_MAGIC: + //just break, continue + break; + case END_FRAME_MAGIC: + currentFramePayloadLength = ByteBuffer.wrap(currentFramePayloadLengthBytes).getInt() - 8; + byte[] totalScannedDataSizeBytes = new byte[8]; + internalRead(totalScannedDataSizeBytes, 0, 8); + byte[] statusBytes = new byte[4]; + internalRead(statusBytes, 0, 4); + byte[] splitBytes = new byte[4]; + internalRead(splitBytes, 0, 4); + byte[] totalLineBytes = new byte[8]; + internalRead(totalLineBytes, 0, 8); + byte[] columnBytes = new byte[4]; + internalRead(columnBytes, 0, 4); + + crc32.update(totalScannedDataSizeBytes); + crc32.update(statusBytes); + crc32.update(splitBytes); + crc32.update(totalLineBytes); + crc32.update(columnBytes); + int status = ByteBuffer.wrap(statusBytes).getInt(); + finished = true; + currentFramePayloadLength = currentFrameOffset; + internalRead(currentFramePayloadChecksumBytes, 0, 4); + + validateCheckSum(currentFramePayloadChecksumBytes, crc32); + if (status / 100 != 2) { + throw new IOException("Oss Select create meta encounter error: code: " + status); + } + + selectObjectMetadata.setCsvObjectMetadata( + new SelectObjectMetadata.CsvObjectMetadata() + .withSplits(ByteBuffer.wrap(splitBytes).getInt()) + .withTotalLines(ByteBuffer.wrap(totalLineBytes).getLong())); + break; + default: + throw new IOException("unsupported frame type found: " + type); + } + //notify create select meta progress + ProgressEventType eventType = ProgressEventType.SELECT_SCAN_EVENT; + if (finished) { + eventType = ProgressEventType.SELECT_COMPLETED_EVENT; + } + long scannedDataSize = ByteBuffer.wrap(scannedDataBytes).getLong(); + if (scannedDataSize >= nextNotificationScannedSize || finished) { + publishSelectProgress(selectProgressListener, eventType, scannedDataSize); + nextNotificationScannedSize += DEFAULT_NOTIFICATION_THRESHOLD; + } + } + } + + @Override + public int read() throws IOException { + readFrame(); + return -1; + } + + @Override + public int read(byte b[]) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + readFrame(); + return -1; + } + + @Override + public int available() throws IOException { + throw new IOException("create select meta input stream does not support available() operation"); + } +} diff --git a/src/main/java/com/aliyun/oss/model/CreateSelectObjectMetadataRequest.java b/src/main/java/com/aliyun/oss/model/CreateSelectObjectMetadataRequest.java index fda0ab0b..6700433f 100644 --- a/src/main/java/com/aliyun/oss/model/CreateSelectObjectMetadataRequest.java +++ b/src/main/java/com/aliyun/oss/model/CreateSelectObjectMetadataRequest.java @@ -1,11 +1,14 @@ package com.aliyun.oss.model; +import com.aliyun.oss.event.ProgressListener; + import static com.aliyun.oss.internal.RequestParameters.SUBRESOURCE_CSV_META; public class CreateSelectObjectMetadataRequest extends HeadObjectRequest { private String process; private InputSerialization inputSerialization = new InputSerialization(); private boolean overwrite; + private ProgressListener selectProgressListener; public CreateSelectObjectMetadataRequest(String bucketName, String key) { super(bucketName, key); @@ -51,4 +54,17 @@ public CreateSelectObjectMetadataRequest withOverwrite(boolean overwrite) { setOverwrite(overwrite); return this; } + + public ProgressListener getSelectProgressListener() { + return selectProgressListener; + } + + public void setSelectProgressListener(ProgressListener selectProgressListener) { + this.selectProgressListener = selectProgressListener; + } + + public CreateSelectObjectMetadataRequest withSelectProgressListener(ProgressListener selectProgressListener) { + setSelectProgressListener(selectProgressListener); + return this; + } } diff --git a/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java b/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java index 8e96bb1e..a4e7070a 100644 --- a/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java +++ b/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java @@ -30,18 +30,18 @@ public SelectObjectMetadata withCsvObjectMetadata(CsvObjectMetadata csvObjectMet } public static class CsvObjectMetadata { - private int totalLines; + private long totalLines; private int splits; - public int getTotalLines() { + public long getTotalLines() { return totalLines; } - public void setTotalLines(int totalLines) { + public void setTotalLines(long totalLines) { this.totalLines = totalLines; } - public CsvObjectMetadata withTotalLines(int totalLines) { + public CsvObjectMetadata withTotalLines(long totalLines) { setTotalLines(totalLines); return this; } From eb25a5e3b567b9e09589a9b297d11116688a8f3e Mon Sep 17 00:00:00 2001 From: "jinhu.wjh" Date: Mon, 3 Sep 2018 17:40:05 +0800 Subject: [PATCH 07/10] rename CsvObjectMetadata to CSVObjectMetadata --- .../oss/model/CreateSelectMetaInputStream.java | 2 +- .../aliyun/oss/model/SelectObjectMetadata.java | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java b/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java index a3fc05e5..9d4d6056 100644 --- a/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java +++ b/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java @@ -139,7 +139,7 @@ private void readFrame() throws IOException { } selectObjectMetadata.setCsvObjectMetadata( - new SelectObjectMetadata.CsvObjectMetadata() + new SelectObjectMetadata.CSVObjectMetadata() .withSplits(ByteBuffer.wrap(splitBytes).getInt()) .withTotalLines(ByteBuffer.wrap(totalLineBytes).getLong())); break; diff --git a/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java b/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java index a4e7070a..a6b25619 100644 --- a/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java +++ b/src/main/java/com/aliyun/oss/model/SelectObjectMetadata.java @@ -2,12 +2,12 @@ /** * Metadata for select object requests. - * For example, {@link CsvObjectMetadata} contains total lines so that + * For example, {@link CSVObjectMetadata} contains total lines so that * users can do line-range query for select requests */ public class SelectObjectMetadata extends ObjectMetadata { - private CsvObjectMetadata csvObjectMetadata; + private CSVObjectMetadata csvObjectMetadata; public SelectObjectMetadata() {} @@ -16,20 +16,20 @@ public SelectObjectMetadata(ObjectMetadata objectMetadata) { metadata.putAll(objectMetadata.getRawMetadata()); } - public CsvObjectMetadata getCsvObjectMetadata() { + public CSVObjectMetadata getCsvObjectMetadata() { return csvObjectMetadata; } - public void setCsvObjectMetadata(CsvObjectMetadata csvObjectMetadata) { + public void setCsvObjectMetadata(CSVObjectMetadata csvObjectMetadata) { this.csvObjectMetadata = csvObjectMetadata; } - public SelectObjectMetadata withCsvObjectMetadata(CsvObjectMetadata csvObjectMetadata) { + public SelectObjectMetadata withCsvObjectMetadata(CSVObjectMetadata csvObjectMetadata) { setCsvObjectMetadata(csvObjectMetadata); return this; } - public static class CsvObjectMetadata { + public static class CSVObjectMetadata { private long totalLines; private int splits; @@ -41,7 +41,7 @@ public void setTotalLines(long totalLines) { this.totalLines = totalLines; } - public CsvObjectMetadata withTotalLines(long totalLines) { + public CSVObjectMetadata withTotalLines(long totalLines) { setTotalLines(totalLines); return this; } @@ -54,7 +54,7 @@ public void setSplits(int splits) { this.splits = splits; } - public CsvObjectMetadata withSplits(int splits) { + public CSVObjectMetadata withSplits(int splits) { setSplits(splits); return this; } From 898824c054d48a230a6945fa6030d6e4a99c6870 Mon Sep 17 00:00:00 2001 From: "jinhu.wjh" Date: Wed, 12 Sep 2018 10:35:02 +0800 Subject: [PATCH 08/10] add error message to CreateSelectMetaInputStream --- .../oss/model/CreateSelectMetaInputStream.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java b/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java index 9d4d6056..bf144332 100644 --- a/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java +++ b/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java @@ -24,7 +24,7 @@ public class CreateSelectMetaInputStream extends FilterInputStream { * |--frame type(4 bytes)--|--payload length(4 bytes)--|--header checksum(4 bytes)--| * |--scanned data bytes(8 bytes)--|--total scan size(8 bytes)--| * |--status code(4 bytes)--|--total splits count(4 bytes)--| - * |--total lines(8 bytes)--|--columns count(4bytes)--|--payload checksum(4 bytes)--| + * |--total lines(8 bytes)--|--columns count(4 bytes)--|--error message(optional)--|--payload checksum(4 bytes)--| */ private static final int END_FRAME_MAGIC = 8388614; private static final int SELECT_VERSION = 1; @@ -129,13 +129,22 @@ private void readFrame() throws IOException { crc32.update(totalLineBytes); crc32.update(columnBytes); int status = ByteBuffer.wrap(statusBytes).getInt(); + int errorMessageSize = (int)(currentFramePayloadLength - 28); + String error = ""; + if (errorMessageSize > 0) { + byte[] errorMessageBytes = new byte[errorMessageSize]; + internalRead(errorMessageBytes, 0, errorMessageSize); + error = new String(errorMessageBytes); + crc32.update(errorMessageBytes); + } + finished = true; currentFramePayloadLength = currentFrameOffset; internalRead(currentFramePayloadChecksumBytes, 0, 4); validateCheckSum(currentFramePayloadChecksumBytes, crc32); if (status / 100 != 2) { - throw new IOException("Oss Select create meta encounter error: code: " + status); + throw new IOException("Oss Select create meta encounter error code: " + status + ", message: " + error); } selectObjectMetadata.setCsvObjectMetadata( From d923b3e1e4ca534ed55592dd9752838e860228f0 Mon Sep 17 00:00:00 2001 From: "jinhu.wjh" Date: Sat, 15 Sep 2018 14:54:18 +0800 Subject: [PATCH 09/10] fix ut --- .../java/com/aliyun/oss/common/utils/VersionUtilTest.java | 2 +- .../java/com/aliyun/oss/integrationtests/BucketAclTest.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/aliyun/oss/common/utils/VersionUtilTest.java b/src/test/java/com/aliyun/oss/common/utils/VersionUtilTest.java index b34d6d49..de85df33 100644 --- a/src/test/java/com/aliyun/oss/common/utils/VersionUtilTest.java +++ b/src/test/java/com/aliyun/oss/common/utils/VersionUtilTest.java @@ -28,7 +28,7 @@ public class VersionUtilTest { @Test public void testGetDefaultUserAgent() { String userAgent = VersionInfoUtils.getDefaultUserAgent(); - assertTrue(userAgent.startsWith("aliyun-sdk-java/3.1.0(")); + assertTrue(userAgent.startsWith("aliyun-sdk-java/3.3.0(")); assertEquals(userAgent.split("/").length, 4); assertEquals(userAgent.split(";").length, 2); assertEquals(userAgent.split("\\(").length, 2); diff --git a/src/test/java/com/aliyun/oss/integrationtests/BucketAclTest.java b/src/test/java/com/aliyun/oss/integrationtests/BucketAclTest.java index f3928dc7..8cccf8a2 100644 --- a/src/test/java/com/aliyun/oss/integrationtests/BucketAclTest.java +++ b/src/test/java/com/aliyun/oss/integrationtests/BucketAclTest.java @@ -173,7 +173,9 @@ public void testUnormalDoesBucketExist() { ossClient.doesBucketExist(nonexistentBucket); Assert.fail("Does bucket exist should not be successful"); } catch (Exception e) { - Assert.assertEquals("UnknownHost", e.getMessage()); + Assert.assertEquals("unormal-does-bucket-exist.oss-cn-taikang.aliyuncs.com\n" + + "[ErrorCode]: UnknownHost\n" + + "[RequestId]: Unknown", e.getMessage()); } } From 333dfe2b75335e6da2b77780b68fd820ae0f75e7 Mon Sep 17 00:00:00 2001 From: "jinhu.wjh" Date: Sat, 15 Sep 2018 16:17:11 +0800 Subject: [PATCH 10/10] add ut --- .../oss/common/parser/RequestMarshallers.java | 2 +- .../model/CreateSelectMetaInputStream.java | 2 +- .../aliyun/oss/model/OutputSerialization.java | 14 ----- .../integrationtests/SelectObjectTest.java | 55 +++++++++++++++---- 4 files changed, 45 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java b/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java index 1e9bb561..78130373 100644 --- a/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java +++ b/src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java @@ -525,7 +525,7 @@ public byte[] marshall(SelectObjectRequest request) { xmlBody.append("" + BinaryUtil.toBase64String(csvInputFormat.getRecordDelimiter().getBytes()) + ""); xmlBody.append("" + BinaryUtil.toBase64String(csvInputFormat.getFieldDelimiter().toString().getBytes()) + ""); xmlBody.append("" + BinaryUtil.toBase64String(csvInputFormat.getQuoteChar().toString().getBytes()) + ""); - xmlBody.append("" + BinaryUtil.toBase64String(csvInputFormat.getCommentChar().toString().getBytes()) + ""); + xmlBody.append("" + BinaryUtil.toBase64String(csvInputFormat.getCommentChar().toString().getBytes()) + ""); if (request.getLineRange() != null) { xmlBody.append("" + request.lineRangeToString(request.getLineRange()) + ""); diff --git a/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java b/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java index bf144332..478b0e64 100644 --- a/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java +++ b/src/main/java/com/aliyun/oss/model/CreateSelectMetaInputStream.java @@ -147,7 +147,7 @@ private void readFrame() throws IOException { throw new IOException("Oss Select create meta encounter error code: " + status + ", message: " + error); } - selectObjectMetadata.setCsvObjectMetadata( + selectObjectMetadata.withCsvObjectMetadata( new SelectObjectMetadata.CSVObjectMetadata() .withSplits(ByteBuffer.wrap(splitBytes).getInt()) .withTotalLines(ByteBuffer.wrap(totalLineBytes).getLong())); diff --git a/src/main/java/com/aliyun/oss/model/OutputSerialization.java b/src/main/java/com/aliyun/oss/model/OutputSerialization.java index 73e6f604..93efe605 100644 --- a/src/main/java/com/aliyun/oss/model/OutputSerialization.java +++ b/src/main/java/com/aliyun/oss/model/OutputSerialization.java @@ -7,7 +7,6 @@ */ public class OutputSerialization implements Serializable { private CSVFormat csvOutputFormat = new CSVFormat(); - private String compressionType = CompressionType.NONE.name(); private boolean keepAllColumns = false; private boolean payloadCrcEnabled = false; private boolean outputRawData = false; @@ -26,19 +25,6 @@ public OutputSerialization withCsvOutputFormat(CSVFormat csvFormat) { return this; } - public String getCompressionType() { - return compressionType; - } - - public void setCompressionType(CompressionType compressionType) { - this.compressionType = compressionType.name(); - } - - public OutputSerialization withCompressionType(CompressionType compressionType) { - setCompressionType(compressionType); - return this; - } - public boolean isKeepAllColumns() { return keepAllColumns; } diff --git a/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java b/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java index 4c8722c4..373c75ac 100644 --- a/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java +++ b/src/test/java/com/aliyun/oss/integrationtests/SelectObjectTest.java @@ -1,5 +1,6 @@ package com.aliyun.oss.integrationtests; +import com.aliyun.oss.event.ProgressEvent; import com.aliyun.oss.model.*; import org.junit.Assert; import org.junit.Test; @@ -9,6 +10,12 @@ public class SelectObjectTest extends TestBase { + private static class CustomProgressListener implements com.aliyun.oss.event.ProgressListener { + public void progressChanged(ProgressEvent progressEvent) { + System.out.println(progressEvent.getBytes()); + } + } + @Test public void testGetSelectObjectMetadata() { final String valid = "get-select-object-metadata-valid"; @@ -20,6 +27,8 @@ public void testGetSelectObjectMetadata() { ossClient.putObject(bucketName, valid, new ByteArrayInputStream(validContent.getBytes())); SelectObjectMetadata validSelectObjectMetadata = ossClient.createSelectObjectMetadata( new CreateSelectObjectMetadataRequest(bucketName, valid) + .withOverwrite(true) + .withSelectProgressListener(new CustomProgressListener()) .withInputSerialization(new InputSerialization().withCsvInputFormat(new CSVFormat()))); Assert.assertEquals(5, validSelectObjectMetadata.getCsvObjectMetadata().getTotalLines()); Assert.assertEquals(1, validSelectObjectMetadata.getCsvObjectMetadata().getSplits()); @@ -47,6 +56,7 @@ public void testSelectObject() throws IOException { final String key = "get-select-object-metadata-valid"; final String content = "name,school,company,age\n" + "Lora Francis,School,Staples Inc,27\n" + + "#Lora Francis,School,Staples Inc,27\n" + "Eleanor Little,School,\"Conectiv, Inc\",43\n" + "Rosie Hughes,School,Western Gas Resources Inc,44\n" + "Lawrence Ross,School,MetLife Inc.,24\n"; @@ -54,43 +64,64 @@ public void testSelectObject() throws IOException { SelectObjectRequest selectObjectRequest = new SelectObjectRequest(bucketName, key) - .withInputSerialization(new InputSerialization().withCsvInputFormat( - new CSVFormat().withHeaderInfo(CSVFormat.Header.Ignore))) - .withOutputSerialization(new OutputSerialization().withCsvOutputFormat(new CSVFormat())); - selectObjectRequest.setExpression("select * from ossobject"); + .withSelectProgressListener(new CustomProgressListener()) + .withSkipPartialDataRecord(false) + .withInputSerialization(new InputSerialization() + .withCompressionType(CompressionType.NONE) + .withCsvInputFormat( + new CSVFormat().withRecordDelimiter("\n") + .withQuoteChar("\"") + .withFieldDelimiter(",") + .withCommentChar("#") + .withHeaderInfo(CSVFormat.Header.Ignore))) + .withOutputSerialization(new OutputSerialization() + .withOutputHeader(false) + .withOutputRawData(false) + .withCrcEnabled(true) + .withKeepAllColumns(true) + .withCsvOutputFormat(new CSVFormat())) + .withExpression("select * from ossobject"); OSSObject ossObject = ossClient.selectObject(selectObjectRequest); byte[] buffer = new byte[1024]; int bytesRead; int off = 0; - while ((bytesRead = ossObject.getObjectContent().read(buffer, off, 1024 - off)) != -1) { - off += bytesRead; + while ((bytesRead = ossObject.getObjectContent().read()) != -1) { + buffer[off++] = (byte)bytesRead; } - Assert.assertEquals(new String(buffer, 0, off), content.substring(content.indexOf("\n") + 1)); + Assert.assertEquals(new String(buffer, 0, off), content.substring(content.indexOf("#L") + 1)); ossClient.createSelectObjectMetadata( new CreateSelectObjectMetadataRequest(bucketName, key) .withInputSerialization(new InputSerialization().withCsvInputFormat(new CSVFormat()))); selectObjectRequest.setLineRange(1, 3); + selectObjectRequest.getOutputSerialization().withKeepAllColumns(true); + selectObjectRequest.getOutputSerialization().withCrcEnabled(false); OSSObject rangeOssObject = ossClient.selectObject(selectObjectRequest); + try { + rangeOssObject.getObjectContent().available(); + Assert.fail("select object input stream does not support available() operation"); + } catch (Exception e) { + + } + off = 0; - while ((bytesRead = rangeOssObject.getObjectContent().read(buffer, off, 1024 - off)) != -1) { + while ((bytesRead = rangeOssObject.getObjectContent().read(buffer)) != -1) { off += bytesRead; } Assert.assertEquals(new String(buffer, 0, off), "Lora Francis,School,Staples Inc,27\n" + - "Eleanor Little,School,\"Conectiv, Inc\",43\n" + - "Rosie Hughes,School,Western Gas Resources Inc,44\n"); + "Eleanor Little,School,\"Conectiv, Inc\",43\n"); - selectObjectRequest.setLineRange(5, 10); + selectObjectRequest.withLineRange(6, 10); try { ossClient.selectObject(selectObjectRequest); Assert.fail("invalid line range for select object request"); } catch (Exception e) { } - selectObjectRequest.setSplitRange(5, 10); + selectObjectRequest.withSplitRange(5, 10); try { ossClient.selectObject(selectObjectRequest); Assert.fail("both split range and line range have been set for select object request");