Skip to content

Commit

Permalink
Fix download behavior for AzureStorageDriver
Browse files Browse the repository at this point in the history
AzureStorageDriver downloads to a different location from the S3StorageDriver. This makes the download behavior more consistent.

Fixes #449
  • Loading branch information
rozele committed Jul 11, 2017
1 parent 6bb0ee6 commit 358be21
Showing 1 changed file with 69 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.security.InvalidKeyException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static com.mesosphere.dcos.cassandra.executor.backup.azure.PageBlobOutputStream.ORIGINAL_SIZE_KEY;
Expand Down Expand Up @@ -77,7 +78,7 @@ public void upload(BackupRestoreContext ctx) throws IOException {
continue;
}
LOGGER.info("Entering keyspace: {}", keyspaceDir.getName());
for (File cfDir : keyspaceDir.listFiles()) {
for (File cfDir : getColumnFamilyDir(keyspaceDir)) {
LOGGER.info("Entering column family: {}", cfDir.getName());
File snapshotDir = new File(cfDir, "snapshots");
File backupDir = new File(snapshotDir, backupName);
Expand Down Expand Up @@ -193,13 +194,14 @@ private void uploadText(CloudBlobContainer container, String fileKey, String tex
}

@Override
public void download(BackupRestoreContext ctx) throws IOException {
public void download(BackupRestoreContext ctx) throws Exception {

final String accountName = ctx.getAccountId();
final String accountKey = ctx.getSecretKey();
final String localLocation = ctx.getLocalLocation();
final String backupName = ctx.getName();
final String nodeId = ctx.getNodeId();
final File[] keyspaces = getNonSystemKeyspaces(ctx);

final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation()));
// https://<account_name>.blob.core.windows.net/<container_name>
Expand All @@ -210,49 +212,84 @@ public void download(BackupRestoreContext ctx) throws IOException {
ctx.getExternalLocation(), containerName);
return;
}
String keyPrefix = String.format("%s/%s", backupName, nodeId);

final Map<String, Long> snapshotFileKeys = getSnapshotFileKeys(container, keyPrefix);
LOGGER.info("Snapshot files for this node: {}", snapshotFileKeys);

for (String fileKey : snapshotFileKeys.keySet()) {
downloadFile(localLocation, container, fileKey, snapshotFileKeys.get(fileKey));
if (Objects.equals(ctx.getRestoreType(), new String("new"))) {
final String keyPrefix = String.format("%s/%s", backupName, nodeId);
final Map<String, Long> snapshotFileKeys = getSnapshotFileKeys(container, keyPrefix);
LOGGER.info("Snapshot files for this node: {}", snapshotFileKeys);
for (String fileKey : snapshotFileKeys.keySet()) {
downloadFile(container, fileKey, snapshotFileKeys.get(fileKey), localLocation + File.separator + fileKey);
}
} else {
for (File keyspace : keyspaces) {
for (File cfDir : getColumnFamilyDir(keyspace)) {
final String columnFamily = cfDir.getName().substring(0, cfDir.getName().indexOf("-"));
final Map<String, Long> snapshotFileKeys = getSnapshotFileKeys(container,
backupName + "/" + nodeId + "/" + keyspace.getName() + "/" + columnFamily);
for (String fileKey : snapshotFileKeys.keySet()) {
final String destinationFile = cfDir.getAbsolutePath() + fileKey.substring(fileKey.lastIndexOf("/"));
downloadFile(container, fileKey, snapshotFileKeys.get(fileKey), destinationFile);
LOGGER.info("Keyspace {}, Column Family {}, FileKey {}, destination {}", keyspace, columnFamily, fileKey, destinationFile);
}
}
}
}
}

private void downloadFile(String localLocation, CloudBlobContainer container, String fileKey, long originalSize) {

LOGGER.info("Downloading | Local location {} | fileKey: {} | Size: {}", localLocation, fileKey, originalSize);
private void downloadFile(CloudBlobContainer container,
String fileKey,
long originalSize,
String destinationFile) throws Exception {
try {
final File snapshotFile = new File(destinationFile);
// Only create parent directory once, if it doesn't exist.
final File parentDir = new File(snapshotFile.getParent());
if (!parentDir.isDirectory()) {
final boolean parentDirCreated = parentDir.mkdirs();
if (!parentDirCreated) {
LOGGER.error(
"Error creating parent directory for file: {}. Skipping to next",
destinationFile);
return;
}
}

final String fileLocation = localLocation + File.separator + fileKey;
File file = new File(fileLocation);
// Only create parent directory once, if it doesn't exist.
if (!createParentDir(file)) {
LOGGER.error("Unable to create parent directories!");
return;
}
snapshotFile.createNewFile();

InputStream inputStream = null;
SnappyInputStream compress = null;
InputStream inputStream = null;
SnappyInputStream compress = null;

try (
FileOutputStream fileOutputStream = new FileOutputStream(file, true);
BufferedOutputStream bos = new BufferedOutputStream(fileOutputStream)) {
try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile, true);
BufferedOutputStream bos = new BufferedOutputStream(fileOutputStream)) {

final CloudPageBlob pageBlobReference = container.getPageBlobReference(fileKey);
inputStream = new PageBlobInputStream(pageBlobReference);
compress = new SnappyInputStream(inputStream);

IOUtils.copy(compress, bos, DEFAULT_PART_SIZE_DOWNLOAD);
final CloudPageBlob pageBlobReference = container.getPageBlobReference(fileKey);
inputStream = new PageBlobInputStream(pageBlobReference);
compress = new SnappyInputStream(inputStream);

IOUtils.copy(compress, bos, DEFAULT_PART_SIZE_DOWNLOAD);
} finally {
IOUtils.closeQuietly(compress);
IOUtils.closeQuietly(inputStream);
}
} catch (Exception e) {
LOGGER.error("Unable to write file: {}", fileKey, e);
} finally {
IOUtils.closeQuietly(compress);
IOUtils.closeQuietly(inputStream);
LOGGER.error("Error downloading the file {} : {}", destinationFile, e);
throw new Exception(e);
}
}

private File[] getNonSystemKeyspaces(BackupRestoreContext ctx) {
File file = new File(ctx.getLocalLocation());
File[] directories = file.listFiles(
(current, name) -> new File(current, name).isDirectory() &&
name.compareTo("system") != 0);
return directories;
}

private static File[] getColumnFamilyDir(File keyspace) {
return keyspace.listFiles(
(current, name) -> new File(current, name).isDirectory());
}

@Override
public String downloadSchema(BackupRestoreContext ctx) throws Exception {
final String accountName = ctx.getAccountId();
Expand Down Expand Up @@ -317,18 +354,6 @@ private CloudBlobContainer getCloudBlobContainer(String accountName, String acco
return container;
}

private boolean createParentDir(File file) {
final File parentDir = new File(file.getParent());
if (!parentDir.isDirectory()) {
final boolean parentDirCreated = parentDir.mkdirs();
if (!parentDirCreated) {
LOGGER.error("Error creating parent directory for file: {}. Skipping to next");
return false;
}
}
return true;
}

private Map<String, Long> getSnapshotFileKeys(CloudBlobContainer container, String keyPrefix) {
Map<String, Long> snapshotFiles = new HashMap<>();

Expand Down

0 comments on commit 358be21

Please sign in to comment.