Skip to content

Commit

Permalink
Fix download behavior for AzureStorageDriver
Browse files Browse the repository at this point in the history
AzureStorageDriver downloads to a different location from the S3StorageDriver. This makes the download behavior more consistent.

Fixes mesosphere-backup#449
  • Loading branch information
rozele committed Jun 7, 2017
1 parent da069b9 commit 9c3c8e2
Showing 1 changed file with 63 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void upload(BackupRestoreContext ctx) throws IOException {
continue;
}
LOGGER.info("Entering keyspace: {}", keyspaceDir.getName());
for (File cfDir : keyspaceDir.listFiles()) {
for (File cfDir : getColumnFamilyDir(keyspaceDir)) {
LOGGER.info("Entering column family: {}", cfDir.getName());
File snapshotDir = new File(cfDir, "snapshots");
File backupDir = new File(snapshotDir, backupName);
Expand Down Expand Up @@ -200,6 +200,7 @@ public void download(BackupRestoreContext ctx) throws IOException {
final String localLocation = ctx.getLocalLocation();
final String backupName = ctx.getName();
final String nodeId = ctx.getNodeId();
final File[] keyspaces = getNonSystemKeyspaces(ctx);

final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation()));
// https://<account_name>.blob.core.windows.net/<container_name>
Expand All @@ -210,49 +211,81 @@ public void download(BackupRestoreContext ctx) throws IOException {
ctx.getExternalLocation(), containerName);
return;
}
String keyPrefix = String.format("%s/%s", backupName, nodeId);

final Map<String, Long> snapshotFileKeys = getSnapshotFileKeys(container, keyPrefix);
LOGGER.info("Snapshot files for this node: {}", snapshotFileKeys);

for (String fileKey : snapshotFileKeys.keySet()) {
downloadFile(localLocation, container, fileKey, snapshotFileKeys.get(fileKey));
if (Objects.equals(ctx.getRestoreType(), new String("new"))) {
final Map<String, Long> snapshotFileKeys = getSnapshotFileKeys(container, keyPrefix);
LOGGER.info("Snapshot files for this node: {}", snapshotFileKeys);
for (String fileKey : snapshotFileKeys.keySet()) {
downloadFile(container, fileKey, snapshotFileKeys.get(fileKey), localLocation + File.separator + fileKey);
}
} else {
for (File keyspace : keyspaces) {
for (File cfDir : getColumnFamilyDir(keyspace)) {
final String columnFamily = cfDir.getName().substring(0, cfDir.getName().indexOf("-"));
final Map<String, Long> snapshotFileKeys = getSnapshotFileKeys(container,
backupName + "/" + nodeId + "/" + keyspace.getName() + "/" + columnFamily);
for (String fileKey : snapshotFileKeys.keySet()) {
final String destinationFile = cfDir.getAbsolutePath() + fileKey.substring(fileKey.lastIndexOf("/"));
downloadFile(container, fileKey, snapshotFileKeys.get(fileKey), destinationFile);
LOGGER.info("Keyspace {}, Column Family {}, FileKey {}, destination {}", keyspace, columnFamily, fileKey, destinationFile);
}
}
}
}
}

private void downloadFile(String localLocation, CloudBlobContainer container, String fileKey, long originalSize) {
private void downloadFile(CloudBlobContainer container, String fileKey, long originalSize, String destinationFile) {

LOGGER.info("Downloading | Local location {} | fileKey: {} | Size: {}", localLocation, fileKey, originalSize);

final String fileLocation = localLocation + File.separator + fileKey;
File file = new File(fileLocation);
// Only create parent directory once, if it doesn't exist.
if (!createParentDir(file)) {
LOGGER.error("Unable to create parent directories!");
return;
}
try {
final File snapshotFile = new File(destinationFile);
// Only create parent directory once, if it doesn't exist.
final File parentDir = new File(snapshotFile.getParent());
if (!parentDir.isDirectory()) {
final boolean parentDirCreated = parentDir.mkdirs();
if (!parentDirCreated) {
LOGGER.error(
"Error creating parent directory for file: {}. Skipping to next",
destinationFile);
return;
}
}

InputStream inputStream = null;
SnappyInputStream compress = null;
snapshotFile.createNewFile();

try (
FileOutputStream fileOutputStream = new FileOutputStream(file, true);
BufferedOutputStream bos = new BufferedOutputStream(fileOutputStream)) {
InputStream inputStream = null;
SnappyInputStream compress = null;

final CloudPageBlob pageBlobReference = container.getPageBlobReference(fileKey);
inputStream = new PageBlobInputStream(pageBlobReference);
compress = new SnappyInputStream(inputStream);
try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile, true);
BufferedOutputStream bos = new BufferedOutputStream(fileOutputStream)) {

IOUtils.copy(compress, bos, DEFAULT_PART_SIZE_DOWNLOAD);
final CloudPageBlob pageBlobReference = container.getPageBlobReference(fileKey);
inputStream = new PageBlobInputStream(pageBlobReference);
compress = new SnappyInputStream(inputStream);

IOUtils.copy(compress, bos, DEFAULT_PART_SIZE_DOWNLOAD);
} finally {
IOUtils.closeQuietly(compress);
IOUtils.closeQuietly(inputStream);
}
} catch (Exception e) {
LOGGER.error("Unable to write file: {}", fileKey, e);
} finally {
IOUtils.closeQuietly(compress);
IOUtils.closeQuietly(inputStream);
LOGGER.error("Error downloading the file {} : {}", destinationFile, e);
throw new Exception(e);
}
}

private File[] getNonSystemKeyspaces(BackupRestoreContext ctx) {
File file = new File(ctx.getLocalLocation());
File[] directories = file.listFiles(
(current, name) -> new File(current, name).isDirectory() &&
name.compareTo("system") != 0);
return directories;
}

private static File[] getColumnFamilyDir(File keyspace) {
return keyspace.listFiles(
(current, name) -> new File(current, name).isDirectory());
}

@Override
public String downloadSchema(BackupRestoreContext ctx) throws Exception {
final String accountName = ctx.getAccountId();
Expand Down Expand Up @@ -317,18 +350,6 @@ private CloudBlobContainer getCloudBlobContainer(String accountName, String acco
return container;
}

private boolean createParentDir(File file) {
final File parentDir = new File(file.getParent());
if (!parentDir.isDirectory()) {
final boolean parentDirCreated = parentDir.mkdirs();
if (!parentDirCreated) {
LOGGER.error("Error creating parent directory for file: {}. Skipping to next");
return false;
}
}
return true;
}

private Map<String, Long> getSnapshotFileKeys(CloudBlobContainer container, String keyPrefix) {
Map<String, Long> snapshotFiles = new HashMap<>();

Expand Down

0 comments on commit 9c3c8e2

Please sign in to comment.