Skip to content

Commit

Permalink
Add schema upload/download support to AzureStorageDriver
Browse files Browse the repository at this point in the history
Uses the CloudBlobContainer abstraction to write the schema to a well-known path.

Fixes #450
  • Loading branch information
rozele committed Jul 11, 2017
1 parent 4f9b7c6 commit 6bb0ee6
Showing 1 changed file with 73 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.security.InvalidKeyException;
Expand Down Expand Up @@ -131,21 +133,29 @@ private void uploadDirectory(String localLocation,

private void uploadFile(CloudBlobContainer container, String fileKey, File sourceFile) {

PageBlobOutputStream pageBlobOutputStream = null;
SnappyOutputStream compress = null;
BufferedOutputStream bufferedOutputStream = null;
try (BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(sourceFile))) {

LOGGER.info("Initiating upload for file: {} | key: {}",
sourceFile.getAbsolutePath(), fileKey);
uploadStream(container, fileKey, inputStream);
} catch (IOException e) {
LOGGER.error("Unable to store blob", e);
}
}

private void uploadStream(CloudBlobContainer container, String fileKey, InputStream inputStream) {

PageBlobOutputStream pageBlobOutputStream = null;
SnappyOutputStream compress = null;
BufferedOutputStream bufferedOutputStream = null;
try {

final CloudPageBlob blob = container.getPageBlobReference(fileKey);
pageBlobOutputStream = new PageBlobOutputStream(blob);
bufferedOutputStream = new BufferedOutputStream(pageBlobOutputStream);

compress = new SnappyOutputStream(bufferedOutputStream, DEFAULT_PART_SIZE_UPLOAD);
IOUtils.copy(inputStream, compress, DEFAULT_PART_SIZE_UPLOAD);

} catch (StorageException | URISyntaxException | IOException e) {
LOGGER.error("Unable to store blob", e);
} finally {
Expand All @@ -157,8 +167,29 @@ private void uploadFile(CloudBlobContainer container, String fileKey, File sourc

@Override
public void uploadSchema(BackupRestoreContext ctx, String schema) {
// ToDo : Add the upload schema to Azure.
// Path: <backupname/node-id/schema.cql>
final String accountName = ctx.getAccountId();
final String accountKey = ctx.getSecretKey();
final String backupName = ctx.getName();
final String nodeId = ctx.getNodeId();

final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation()));
// https://<account_name>.blob.core.windows.net/<container_name>
final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName);

if (container == null) {
LOGGER.error("Error uploading schema. Unable to connect to {}, for container {}.",
ctx.getExternalLocation(), containerName);
return;
}

final String key = backupName + "/" + nodeId + "/" + StorageUtil.SCHEMA_FILE;
uploadText(container, key, schema);
}

private void uploadText(CloudBlobContainer container, String fileKey, String text) {
final InputStream inputStream = new ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8));
LOGGER.info("Initiating upload for schema | key: {}", fileKey);
uploadStream(container, fileKey, inputStream);
}

@Override
Expand All @@ -175,8 +206,8 @@ public void download(BackupRestoreContext ctx) throws IOException {
final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName);

if (container == null) {
LOGGER.error("Error uploading snapshots. Unable to connect to {}, for container {}.",
ctx.getExternalLocation(), containerName, localLocation);
LOGGER.error("Error downloading snapshots. Unable to connect to {}, for container {}.",
ctx.getExternalLocation(), containerName);
return;
}
String keyPrefix = String.format("%s/%s", backupName, nodeId);
Expand Down Expand Up @@ -224,8 +255,40 @@ private void downloadFile(String localLocation, CloudBlobContainer container, St

@Override
public String downloadSchema(BackupRestoreContext ctx) throws Exception {
// ToDo : Add the download schema to Azure.
return new String("");
final String accountName = ctx.getAccountId();
final String accountKey = ctx.getSecretKey();
final String backupName = ctx.getName();
final String nodeId = ctx.getNodeId();

final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation()));
// https://<account_name>.blob.core.windows.net/<container_name>
final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName);

if (container == null) {
LOGGER.error("Error downloading snapshots. Unable to connect to {}, for container {}.",
ctx.getExternalLocation(), containerName);
return new String("");
}

final String key = backupName + "/" + nodeId + "/" + StorageUtil.SCHEMA_FILE;

InputStream inputStream = null;
SnappyInputStream compress = null;

try {
final CloudPageBlob pageBlobReference = container.getPageBlobReference(key);
inputStream = new PageBlobInputStream(pageBlobReference);
compress = new SnappyInputStream(inputStream);

return IOUtils.toString(compress, "UTF-8");

} catch (Exception e) {
LOGGER.error("Unable to read schema from: {}", key, e);
return new String("");
} finally {
IOUtils.closeQuietly(compress);
IOUtils.closeQuietly(inputStream);
}
}

private String getContainerName(String externalLocation) {
Expand Down

0 comments on commit 6bb0ee6

Please sign in to comment.