Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to AzureStorageDriver #448

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.security.InvalidKeyException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static com.mesosphere.dcos.cassandra.executor.backup.azure.PageBlobOutputStream.ORIGINAL_SIZE_KEY;
Expand All @@ -41,8 +44,8 @@
*/
public class AzureStorageDriver implements BackupStorageDriver {

private final Logger logger = LoggerFactory.getLogger(getClass());

private static final Logger LOGGER = LoggerFactory.getLogger(
AzureStorageDriver.class);
private static final int DEFAULT_PART_SIZE_UPLOAD = 4 * 1024 * 1024; // Chunk size set to 4MB
private static final int DEFAULT_PART_SIZE_DOWNLOAD = 4 * 1024 * 1024; // Chunk size set to 4MB

Expand All @@ -62,7 +65,7 @@ public void upload(BackupRestoreContext ctx) throws IOException {

final File dataDirectory = new File(localLocation);
if (container == null || !dataDirectory.isDirectory()) {
logger.error("Error uploading snapshots. Unable to connect to {}, for container {} or Directory {} doesn't exist.",
LOGGER.error("Error uploading snapshots. Unable to connect to {}, for container {} or Directory {} doesn't exist.",
ctx.getExternalLocation(), containerName, localLocation);
return;
}
Expand All @@ -74,38 +77,38 @@ public void upload(BackupRestoreContext ctx) throws IOException {
// Only enter keyspace directory.
continue;
}
logger.info("Entering keyspace: {}", keyspaceDir.getName());
for (File cfDir : keyspaceDir.listFiles()) {
logger.info("Entering column family: {}", cfDir.getName());
LOGGER.info("Entering keyspace: {}", keyspaceDir.getName());
for (File cfDir : getColumnFamilyDir(keyspaceDir)) {
LOGGER.info("Entering column family: {}", cfDir.getName());
File snapshotDir = new File(cfDir, "snapshots");
File backupDir = new File(snapshotDir, backupName);
if (!StorageUtil.isValidBackupDir(keyspaceDir, cfDir, snapshotDir, backupDir)) {
logger.info("Skipping directory: {}", snapshotDir.getAbsolutePath());
LOGGER.info("Skipping directory: {}", snapshotDir.getAbsolutePath());
continue;
}
logger.info(
LOGGER.info(
"Valid backup directories. KeyspaceDir: {} | ColumnFamilyDir: {} | SnapshotDir: {} | BackupName: {}",
keyspaceDir.getAbsolutePath(), cfDir.getAbsolutePath(),
snapshotDir.getAbsolutePath(), backupName);

final Optional<File> snapshotDirectory = StorageUtil.getValidSnapshotDirectory(snapshotDir, backupName);
logger.info("Valid snapshot directory: {}", snapshotDirectory.isPresent());
LOGGER.info("Valid snapshot directory: {}", snapshotDirectory.isPresent());

if (snapshotDirectory.isPresent()) {
logger.info("Going to upload directory: {}", snapshotDirectory.get().getAbsolutePath());
LOGGER.info("Going to upload directory: {}", snapshotDirectory.get().getAbsolutePath());

uploadDirectory(snapshotDirectory.get().getAbsolutePath(), container, containerName, key,
keyspaceDir.getName(), cfDir.getName());

} else {
logger.warn(
LOGGER.warn(
"Snapshots directory: {} doesn't contain the current backup directory: {}",
snapshotDir.getName(), backupName);
}
}
}

logger.info("Done uploading snapshots for backup: {}", backupName);
LOGGER.info("Done uploading snapshots for backup: {}", backupName);
}

private void uploadDirectory(String localLocation,
Expand All @@ -115,7 +118,7 @@ private void uploadDirectory(String localLocation,
String keyspaceName,
String cfName) throws IOException {

logger.info(
LOGGER.info(
"uploadDirectory() localLocation: {}, containerName: {}, key: {}, keyspaceName: {}, cfName: {}",
localLocation, containerName, key, keyspaceName, cfName);

Expand All @@ -131,23 +134,31 @@ private void uploadDirectory(String localLocation,

private void uploadFile(CloudBlobContainer container, String fileKey, File sourceFile) {

PageBlobOutputStream pageBlobOutputStream = null;
SnappyOutputStream compress = null;
BufferedOutputStream bufferedOutputStream = null;
try (BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(sourceFile))) {

logger.info("Initiating upload for file: {} | key: {}",
LOGGER.info("Initiating upload for file: {} | key: {}",
sourceFile.getAbsolutePath(), fileKey);
uploadStream(container, fileKey, inputStream);
} catch (IOException e) {
LOGGER.error("Unable to store blob", e);
}
}

private void uploadStream(CloudBlobContainer container, String fileKey, InputStream inputStream) {

PageBlobOutputStream pageBlobOutputStream = null;
SnappyOutputStream compress = null;
BufferedOutputStream bufferedOutputStream = null;
try {

final CloudPageBlob blob = container.getPageBlobReference(fileKey);
pageBlobOutputStream = new PageBlobOutputStream(blob);
bufferedOutputStream = new BufferedOutputStream(pageBlobOutputStream);

compress = new SnappyOutputStream(bufferedOutputStream, DEFAULT_PART_SIZE_UPLOAD);
IOUtils.copy(inputStream, compress, DEFAULT_PART_SIZE_UPLOAD);

} catch (StorageException | URISyntaxException | IOException e) {
logger.error("Unable to store blob", e);
LOGGER.error("Unable to store blob", e);
} finally {
IOUtils.closeQuietly(compress); // super important that the compress close is called first in order to flush
IOUtils.closeQuietly(bufferedOutputStream);
Expand All @@ -157,77 +168,166 @@ private void uploadFile(CloudBlobContainer container, String fileKey, File sourc

@Override
public void uploadSchema(BackupRestoreContext ctx, String schema) {
// ToDo : Add the upload schema to Azure.
// Path: <backupname/node-id/schema.cql>
final String accountName = ctx.getAccountId();
final String accountKey = ctx.getSecretKey();
final String backupName = ctx.getName();
final String nodeId = ctx.getNodeId();

final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation()));
// https://<account_name>.blob.core.windows.net/<container_name>
final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName);

if (container == null) {
LOGGER.error("Error uploading schema. Unable to connect to {}, for container {}.",
ctx.getExternalLocation(), containerName);
return;
}

final String key = backupName + "/" + nodeId + "/" + StorageUtil.SCHEMA_FILE;
uploadText(container, key, schema);
}

private void uploadText(CloudBlobContainer container, String fileKey, String text) {
final InputStream inputStream = new ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8));
LOGGER.info("Initiating upload for schema | key: {}", fileKey);
uploadStream(container, fileKey, inputStream);
}

@Override
public void download(BackupRestoreContext ctx) throws IOException {
public void download(BackupRestoreContext ctx) throws Exception {

final String accountName = ctx.getAccountId();
final String accountKey = ctx.getSecretKey();
final String localLocation = ctx.getLocalLocation();
final String backupName = ctx.getName();
final String nodeId = ctx.getNodeId();
final File[] keyspaces = getNonSystemKeyspaces(ctx);

final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation()));
// https://<account_name>.blob.core.windows.net/<container_name>
final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName);

if (container == null) {
logger.error("Error uploading snapshots. Unable to connect to {}, for container {}.",
ctx.getExternalLocation(), containerName, localLocation);
LOGGER.error("Error downloading snapshots. Unable to connect to {}, for container {}.",
ctx.getExternalLocation(), containerName);
return;
}
String keyPrefix = String.format("%s/%s", backupName, nodeId);

final Map<String, Long> snapshotFileKeys = getSnapshotFileKeys(container, keyPrefix);
logger.info("Snapshot files for this node: {}", snapshotFileKeys);
if (Objects.equals(ctx.getRestoreType(), new String("new"))) {
final String keyPrefix = String.format("%s/%s", backupName, nodeId);
final Map<String, Long> snapshotFileKeys = getSnapshotFileKeys(container, keyPrefix);
LOGGER.info("Snapshot files for this node: {}", snapshotFileKeys);
for (String fileKey : snapshotFileKeys.keySet()) {
downloadFile(container, fileKey, snapshotFileKeys.get(fileKey), localLocation + File.separator + fileKey);
}
} else {
for (File keyspace : keyspaces) {
for (File cfDir : getColumnFamilyDir(keyspace)) {
final String columnFamily = cfDir.getName().substring(0, cfDir.getName().indexOf("-"));
final Map<String, Long> snapshotFileKeys = getSnapshotFileKeys(container,
backupName + "/" + nodeId + "/" + keyspace.getName() + "/" + columnFamily);
for (String fileKey : snapshotFileKeys.keySet()) {
final String destinationFile = cfDir.getAbsolutePath() + fileKey.substring(fileKey.lastIndexOf("/"));
downloadFile(container, fileKey, snapshotFileKeys.get(fileKey), destinationFile);
LOGGER.info("Keyspace {}, Column Family {}, FileKey {}, destination {}", keyspace, columnFamily, fileKey, destinationFile);
}
}
}
}
}

private void downloadFile(CloudBlobContainer container,
String fileKey,
long originalSize,
String destinationFile) throws Exception {
try {
final File snapshotFile = new File(destinationFile);
// Only create parent directory once, if it doesn't exist.
final File parentDir = new File(snapshotFile.getParent());
if (!parentDir.isDirectory()) {
final boolean parentDirCreated = parentDir.mkdirs();
if (!parentDirCreated) {
LOGGER.error(
"Error creating parent directory for file: {}. Skipping to next",
destinationFile);
return;
}
}

snapshotFile.createNewFile();

InputStream inputStream = null;
SnappyInputStream compress = null;

try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile, true);
BufferedOutputStream bos = new BufferedOutputStream(fileOutputStream)) {

for (String fileKey : snapshotFileKeys.keySet()) {
downloadFile(localLocation, container, fileKey, snapshotFileKeys.get(fileKey));
final CloudPageBlob pageBlobReference = container.getPageBlobReference(fileKey);
inputStream = new PageBlobInputStream(pageBlobReference);
compress = new SnappyInputStream(inputStream);

IOUtils.copy(compress, bos, DEFAULT_PART_SIZE_DOWNLOAD);
} finally {
IOUtils.closeQuietly(compress);
IOUtils.closeQuietly(inputStream);
}
} catch (Exception e) {
LOGGER.error("Error downloading the file {} : {}", destinationFile, e);
throw new Exception(e);
}
}

private void downloadFile(String localLocation, CloudBlobContainer container, String fileKey, long originalSize) {
private File[] getNonSystemKeyspaces(BackupRestoreContext ctx) {
File file = new File(ctx.getLocalLocation());
File[] directories = file.listFiles(
(current, name) -> new File(current, name).isDirectory() &&
name.compareTo("system") != 0);
return directories;
}

private static File[] getColumnFamilyDir(File keyspace) {
return keyspace.listFiles(
(current, name) -> new File(current, name).isDirectory());
}

logger.info("Downloading | Local location {} | fileKey: {} | Size: {}", localLocation, fileKey, originalSize);
@Override
public String downloadSchema(BackupRestoreContext ctx) throws Exception {
final String accountName = ctx.getAccountId();
final String accountKey = ctx.getSecretKey();
final String backupName = ctx.getName();
final String nodeId = ctx.getNodeId();

final String fileLocation = localLocation + File.separator + fileKey;
File file = new File(fileLocation);
// Only create parent directory once, if it doesn't exist.
if (!createParentDir(file)) {
logger.error("Unable to create parent directories!");
return;
final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation()));
// https://<account_name>.blob.core.windows.net/<container_name>
final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName);

if (container == null) {
LOGGER.error("Error downloading snapshots. Unable to connect to {}, for container {}.",
ctx.getExternalLocation(), containerName);
return new String("");
}

final String key = backupName + "/" + nodeId + "/" + StorageUtil.SCHEMA_FILE;

InputStream inputStream = null;
SnappyInputStream compress = null;

try (
FileOutputStream fileOutputStream = new FileOutputStream(file, true);
BufferedOutputStream bos = new BufferedOutputStream(fileOutputStream)) {

final CloudPageBlob pageBlobReference = container.getPageBlobReference(fileKey);

try {
final CloudPageBlob pageBlobReference = container.getPageBlobReference(key);
inputStream = new PageBlobInputStream(pageBlobReference);
compress = new SnappyInputStream(inputStream);

IOUtils.copy(compress, bos, DEFAULT_PART_SIZE_DOWNLOAD);
return IOUtils.toString(compress, "UTF-8");

} catch (Exception e) {
logger.error("Unable to write file: {}", fileKey, e);
LOGGER.error("Unable to read schema from: {}", key, e);
return new String("");
} finally {
IOUtils.closeQuietly(compress);
IOUtils.closeQuietly(inputStream);
}
}

@Override
public String downloadSchema(BackupRestoreContext ctx) throws Exception {
// ToDo : Add the download schema to Azure.
return new String("");
}

private String getContainerName(String externalLocation) {
return externalLocation.substring("azure://".length()).replace("/", "");
}
Expand All @@ -247,25 +347,13 @@ private CloudBlobContainer getCloudBlobContainer(String accountName, String acco
container = serviceClient.getContainerReference(containerName);
container.createIfNotExists();
} catch (StorageException | URISyntaxException | InvalidKeyException e) {
logger.error("Error connecting to container for account {} and container name {}", accountName, containerName, e);
LOGGER.error("Error connecting to container for account {} and container name {}", accountName, containerName, e);
}
}

return container;
}

private boolean createParentDir(File file) {
final File parentDir = new File(file.getParent());
if (!parentDir.isDirectory()) {
final boolean parentDirCreated = parentDir.mkdirs();
if (!parentDirCreated) {
logger.error("Error creating parent directory for file: {}. Skipping to next");
return false;
}
}
return true;
}

private Map<String, Long> getSnapshotFileKeys(CloudBlobContainer container, String keyPrefix) {
Map<String, Long> snapshotFiles = new HashMap<>();

Expand All @@ -277,7 +365,7 @@ private Map<String, Long> getSnapshotFileKeys(CloudBlobContainer container, Stri
}
}
} catch (StorageException e) {
logger.error("Unable to retrieve metadata.", e);
LOGGER.error("Unable to retrieve metadata.", e);
// all or none
snapshotFiles = new HashMap<>();
}
Expand All @@ -293,7 +381,7 @@ private long getOriginalFileSize(CloudPageBlob pageBlobReference) throws Storage
try {
size = Long.parseLong(map.get(ORIGINAL_SIZE_KEY));
} catch (Exception e) {
logger.error("File size metadata missing or is not a number.");
LOGGER.error("File size metadata missing or is not a number.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class StorageDriverFactory {
public static BackupStorageDriver createStorageDriver(CassandraTask cassandraTask) {
String externalLocation = null;
switch (cassandraTask.getType()) {
case BACKUP_SNAPSHOT:
case BACKUP_UPLOAD:
externalLocation = ((BackupUploadTask)cassandraTask).getBackupRestoreContext().getExternalLocation();
break;
case BACKUP_SCHEMA:
Expand Down