From f094fb422102be89874f17a7dc38b730779f60dc Mon Sep 17 00:00:00 2001 From: Eric Rozell Date: Tue, 6 Jun 2017 20:01:36 -0400 Subject: [PATCH 1/4] Bug in storage driver resolution for backup upload The switch for the createStorageDriver call in StorageDriverFactory was missing the case for the backup upload task type. Fixes #446 --- .../dcos/cassandra/executor/backup/StorageDriverFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/StorageDriverFactory.java b/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/StorageDriverFactory.java index 875eba93..4ea12d0a 100644 --- a/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/StorageDriverFactory.java +++ b/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/StorageDriverFactory.java @@ -18,7 +18,7 @@ public class StorageDriverFactory { public static BackupStorageDriver createStorageDriver(CassandraTask cassandraTask) { String externalLocation = null; switch (cassandraTask.getType()) { - case BACKUP_SNAPSHOT: + case BACKUP_UPLOAD: externalLocation = ((BackupUploadTask)cassandraTask).getBackupRestoreContext().getExternalLocation(); break; case BACKUP_SCHEMA: From 4f9b7c6f1de85f0aef54aaa93cf96e293f813f27 Mon Sep 17 00:00:00 2001 From: Eric Rozell Date: Tue, 6 Jun 2017 21:13:51 -0400 Subject: [PATCH 2/4] Fix logging for AzureStorageDriver The static initializer for AzureStorageDriver does not work as expected. Changing to be consistent with how the logger is resolved in other files. Fixes #447 --- .../executor/backup/AzureStorageDriver.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java b/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java index 6616a8a8..7535de59 100644 --- a/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java +++ b/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java @@ -41,8 +41,8 @@ */ public class AzureStorageDriver implements BackupStorageDriver { - private final Logger logger = LoggerFactory.getLogger(getClass()); - +private static final Logger LOGGER = LoggerFactory.getLogger( + AzureStorageDriver.class); private static final int DEFAULT_PART_SIZE_UPLOAD = 4 * 1024 * 1024; // Chunk size set to 4MB private static final int DEFAULT_PART_SIZE_DOWNLOAD = 4 * 1024 * 1024; // Chunk size set to 4MB @@ -62,7 +62,7 @@ public void upload(BackupRestoreContext ctx) throws IOException { final File dataDirectory = new File(localLocation); if (container == null || !dataDirectory.isDirectory()) { - logger.error("Error uploading snapshots. Unable to connect to {}, for container {} or Directory {} doesn't exist.", + LOGGER.error("Error uploading snapshots. Unable to connect to {}, for container {} or Directory {} doesn't exist.", ctx.getExternalLocation(), containerName, localLocation); return; } @@ -74,38 +74,38 @@ public void upload(BackupRestoreContext ctx) throws IOException { // Only enter keyspace directory. continue; } - logger.info("Entering keyspace: {}", keyspaceDir.getName()); + LOGGER.info("Entering keyspace: {}", keyspaceDir.getName()); for (File cfDir : keyspaceDir.listFiles()) { - logger.info("Entering column family: {}", cfDir.getName()); + LOGGER.info("Entering column family: {}", cfDir.getName()); File snapshotDir = new File(cfDir, "snapshots"); File backupDir = new File(snapshotDir, backupName); if (!StorageUtil.isValidBackupDir(keyspaceDir, cfDir, snapshotDir, backupDir)) { - logger.info("Skipping directory: {}", snapshotDir.getAbsolutePath()); + LOGGER.info("Skipping directory: {}", snapshotDir.getAbsolutePath()); continue; } - logger.info( + LOGGER.info( "Valid backup directories. KeyspaceDir: {} | ColumnFamilyDir: {} | SnapshotDir: {} | BackupName: {}", keyspaceDir.getAbsolutePath(), cfDir.getAbsolutePath(), snapshotDir.getAbsolutePath(), backupName); final Optional snapshotDirectory = StorageUtil.getValidSnapshotDirectory(snapshotDir, backupName); - logger.info("Valid snapshot directory: {}", snapshotDirectory.isPresent()); + LOGGER.info("Valid snapshot directory: {}", snapshotDirectory.isPresent()); if (snapshotDirectory.isPresent()) { - logger.info("Going to upload directory: {}", snapshotDirectory.get().getAbsolutePath()); + LOGGER.info("Going to upload directory: {}", snapshotDirectory.get().getAbsolutePath()); uploadDirectory(snapshotDirectory.get().getAbsolutePath(), container, containerName, key, keyspaceDir.getName(), cfDir.getName()); } else { - logger.warn( + LOGGER.warn( "Snapshots directory: {} doesn't contain the current backup directory: {}", snapshotDir.getName(), backupName); } } } - logger.info("Done uploading snapshots for backup: {}", backupName); + LOGGER.info("Done uploading snapshots for backup: {}", backupName); } private void uploadDirectory(String localLocation, @@ -115,7 +115,7 @@ private void uploadDirectory(String localLocation, String keyspaceName, String cfName) throws IOException { - logger.info( + LOGGER.info( "uploadDirectory() localLocation: {}, containerName: {}, key: {}, keyspaceName: {}, cfName: {}", localLocation, containerName, key, keyspaceName, cfName); @@ -136,7 +136,7 @@ private void uploadFile(CloudBlobContainer container, String fileKey, File sourc BufferedOutputStream bufferedOutputStream = null; try (BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(sourceFile))) { - logger.info("Initiating upload for file: {} | key: {}", + LOGGER.info("Initiating upload for file: {} | key: {}", sourceFile.getAbsolutePath(), fileKey); final CloudPageBlob blob = container.getPageBlobReference(fileKey); @@ -147,7 +147,7 @@ private void uploadFile(CloudBlobContainer container, String fileKey, File sourc IOUtils.copy(inputStream, compress, DEFAULT_PART_SIZE_UPLOAD); } catch (StorageException | URISyntaxException | IOException e) { - logger.error("Unable to store blob", e); + LOGGER.error("Unable to store blob", e); } finally { IOUtils.closeQuietly(compress); // super important that the compress close is called first in order to flush IOUtils.closeQuietly(bufferedOutputStream); @@ -175,14 +175,14 @@ public void download(BackupRestoreContext ctx) throws IOException { final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName); if (container == null) { - logger.error("Error uploading snapshots. Unable to connect to {}, for container {}.", + LOGGER.error("Error uploading snapshots. Unable to connect to {}, for container {}.", ctx.getExternalLocation(), containerName, localLocation); return; } String keyPrefix = String.format("%s/%s", backupName, nodeId); final Map snapshotFileKeys = getSnapshotFileKeys(container, keyPrefix); - logger.info("Snapshot files for this node: {}", snapshotFileKeys); + LOGGER.info("Snapshot files for this node: {}", snapshotFileKeys); for (String fileKey : snapshotFileKeys.keySet()) { downloadFile(localLocation, container, fileKey, snapshotFileKeys.get(fileKey)); @@ -191,13 +191,13 @@ public void download(BackupRestoreContext ctx) throws IOException { private void downloadFile(String localLocation, CloudBlobContainer container, String fileKey, long originalSize) { - logger.info("Downloading | Local location {} | fileKey: {} | Size: {}", localLocation, fileKey, originalSize); + LOGGER.info("Downloading | Local location {} | fileKey: {} | Size: {}", localLocation, fileKey, originalSize); final String fileLocation = localLocation + File.separator + fileKey; File file = new File(fileLocation); // Only create parent directory once, if it doesn't exist. if (!createParentDir(file)) { - logger.error("Unable to create parent directories!"); + LOGGER.error("Unable to create parent directories!"); return; } @@ -215,7 +215,7 @@ private void downloadFile(String localLocation, CloudBlobContainer container, St IOUtils.copy(compress, bos, DEFAULT_PART_SIZE_DOWNLOAD); } catch (Exception e) { - logger.error("Unable to write file: {}", fileKey, e); + LOGGER.error("Unable to write file: {}", fileKey, e); } finally { IOUtils.closeQuietly(compress); IOUtils.closeQuietly(inputStream); @@ -247,7 +247,7 @@ private CloudBlobContainer getCloudBlobContainer(String accountName, String acco container = serviceClient.getContainerReference(containerName); container.createIfNotExists(); } catch (StorageException | URISyntaxException | InvalidKeyException e) { - logger.error("Error connecting to container for account {} and container name {}", accountName, containerName, e); + LOGGER.error("Error connecting to container for account {} and container name {}", accountName, containerName, e); } } @@ -259,7 +259,7 @@ private boolean createParentDir(File file) { if (!parentDir.isDirectory()) { final boolean parentDirCreated = parentDir.mkdirs(); if (!parentDirCreated) { - logger.error("Error creating parent directory for file: {}. Skipping to next"); + LOGGER.error("Error creating parent directory for file: {}. Skipping to next"); return false; } } @@ -277,7 +277,7 @@ private Map getSnapshotFileKeys(CloudBlobContainer container, Stri } } } catch (StorageException e) { - logger.error("Unable to retrieve metadata.", e); + LOGGER.error("Unable to retrieve metadata.", e); // all or none snapshotFiles = new HashMap<>(); } @@ -293,7 +293,7 @@ private long getOriginalFileSize(CloudPageBlob pageBlobReference) throws Storage try { size = Long.parseLong(map.get(ORIGINAL_SIZE_KEY)); } catch (Exception e) { - logger.error("File size metadata missing or is not a number."); + LOGGER.error("File size metadata missing or is not a number."); } } From 6bb0ee6355724d631154a225dded86d8dc5b4bfb Mon Sep 17 00:00:00 2001 From: Eric Rozell Date: Wed, 7 Jun 2017 10:19:44 -0400 Subject: [PATCH 3/4] Add schema upload/download support to AzureStorageDriver Uses the CloudBlobContainer abstraction to write the schema to a well-known path. Fixes #450 --- .../executor/backup/AzureStorageDriver.java | 83 ++++++++++++++++--- 1 file changed, 73 insertions(+), 10 deletions(-) diff --git a/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java b/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java index 7535de59..d42e4f39 100644 --- a/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java +++ b/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java @@ -18,12 +18,14 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.nio.file.FileSystems; import java.nio.file.Files; import java.security.InvalidKeyException; @@ -131,13 +133,22 @@ private void uploadDirectory(String localLocation, private void uploadFile(CloudBlobContainer container, String fileKey, File sourceFile) { - PageBlobOutputStream pageBlobOutputStream = null; - SnappyOutputStream compress = null; - BufferedOutputStream bufferedOutputStream = null; try (BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(sourceFile))) { LOGGER.info("Initiating upload for file: {} | key: {}", sourceFile.getAbsolutePath(), fileKey); + uploadStream(container, fileKey, inputStream); + } catch (IOException e) { + LOGGER.error("Unable to store blob", e); + } + } + + private void uploadStream(CloudBlobContainer container, String fileKey, InputStream inputStream) { + + PageBlobOutputStream pageBlobOutputStream = null; + SnappyOutputStream compress = null; + BufferedOutputStream bufferedOutputStream = null; + try { final CloudPageBlob blob = container.getPageBlobReference(fileKey); pageBlobOutputStream = new PageBlobOutputStream(blob); @@ -145,7 +156,6 @@ private void uploadFile(CloudBlobContainer container, String fileKey, File sourc compress = new SnappyOutputStream(bufferedOutputStream, DEFAULT_PART_SIZE_UPLOAD); IOUtils.copy(inputStream, compress, DEFAULT_PART_SIZE_UPLOAD); - } catch (StorageException | URISyntaxException | IOException e) { LOGGER.error("Unable to store blob", e); } finally { @@ -157,8 +167,29 @@ private void uploadFile(CloudBlobContainer container, String fileKey, File sourc @Override public void uploadSchema(BackupRestoreContext ctx, String schema) { - // ToDo : Add the upload schema to Azure. - // Path: + final String accountName = ctx.getAccountId(); + final String accountKey = ctx.getSecretKey(); + final String backupName = ctx.getName(); + final String nodeId = ctx.getNodeId(); + + final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation())); + // https://.blob.core.windows.net/ + final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName); + + if (container == null) { + LOGGER.error("Error uploading schema. Unable to connect to {}, for container {}.", + ctx.getExternalLocation(), containerName); + return; + } + + final String key = backupName + "/" + nodeId + "/" + StorageUtil.SCHEMA_FILE; + uploadText(container, key, schema); + } + + private void uploadText(CloudBlobContainer container, String fileKey, String text) { + final InputStream inputStream = new ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8)); + LOGGER.info("Initiating upload for schema | key: {}", fileKey); + uploadStream(container, fileKey, inputStream); } @Override @@ -175,8 +206,8 @@ public void download(BackupRestoreContext ctx) throws IOException { final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName); if (container == null) { - LOGGER.error("Error uploading snapshots. Unable to connect to {}, for container {}.", - ctx.getExternalLocation(), containerName, localLocation); + LOGGER.error("Error downloading snapshots. Unable to connect to {}, for container {}.", + ctx.getExternalLocation(), containerName); return; } String keyPrefix = String.format("%s/%s", backupName, nodeId); @@ -224,8 +255,40 @@ private void downloadFile(String localLocation, CloudBlobContainer container, St @Override public String downloadSchema(BackupRestoreContext ctx) throws Exception { - // ToDo : Add the download schema to Azure. - return new String(""); + final String accountName = ctx.getAccountId(); + final String accountKey = ctx.getSecretKey(); + final String backupName = ctx.getName(); + final String nodeId = ctx.getNodeId(); + + final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation())); + // https://.blob.core.windows.net/ + final CloudBlobContainer container = getCloudBlobContainer(accountName, accountKey, containerName); + + if (container == null) { + LOGGER.error("Error downloading snapshots. Unable to connect to {}, for container {}.", + ctx.getExternalLocation(), containerName); + return new String(""); + } + + final String key = backupName + "/" + nodeId + "/" + StorageUtil.SCHEMA_FILE; + + InputStream inputStream = null; + SnappyInputStream compress = null; + + try { + final CloudPageBlob pageBlobReference = container.getPageBlobReference(key); + inputStream = new PageBlobInputStream(pageBlobReference); + compress = new SnappyInputStream(inputStream); + + return IOUtils.toString(compress, "UTF-8"); + + } catch (Exception e) { + LOGGER.error("Unable to read schema from: {}", key, e); + return new String(""); + } finally { + IOUtils.closeQuietly(compress); + IOUtils.closeQuietly(inputStream); + } } private String getContainerName(String externalLocation) { From 358be217a0b7f99f06c4661f7eb973e392d47eba Mon Sep 17 00:00:00 2001 From: Eric Rozell Date: Wed, 7 Jun 2017 16:16:03 -0400 Subject: [PATCH 4/4] Fix download behavior for AzureStorageDriver AzureStorageDriver downloads to a different location from the S3StorageDriver. This makes the download behavior more consistent. Fixes #449 --- .../executor/backup/AzureStorageDriver.java | 113 +++++++++++------- 1 file changed, 69 insertions(+), 44 deletions(-) diff --git a/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java b/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java index d42e4f39..9d484602 100644 --- a/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java +++ b/cassandra-executor/src/main/java/com/mesosphere/dcos/cassandra/executor/backup/AzureStorageDriver.java @@ -31,6 +31,7 @@ import java.security.InvalidKeyException; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import static com.mesosphere.dcos.cassandra.executor.backup.azure.PageBlobOutputStream.ORIGINAL_SIZE_KEY; @@ -77,7 +78,7 @@ public void upload(BackupRestoreContext ctx) throws IOException { continue; } LOGGER.info("Entering keyspace: {}", keyspaceDir.getName()); - for (File cfDir : keyspaceDir.listFiles()) { + for (File cfDir : getColumnFamilyDir(keyspaceDir)) { LOGGER.info("Entering column family: {}", cfDir.getName()); File snapshotDir = new File(cfDir, "snapshots"); File backupDir = new File(snapshotDir, backupName); @@ -193,13 +194,14 @@ private void uploadText(CloudBlobContainer container, String fileKey, String tex } @Override - public void download(BackupRestoreContext ctx) throws IOException { + public void download(BackupRestoreContext ctx) throws Exception { final String accountName = ctx.getAccountId(); final String accountKey = ctx.getSecretKey(); final String localLocation = ctx.getLocalLocation(); final String backupName = ctx.getName(); final String nodeId = ctx.getNodeId(); + final File[] keyspaces = getNonSystemKeyspaces(ctx); final String containerName = StringUtils.lowerCase(getContainerName(ctx.getExternalLocation())); // https://.blob.core.windows.net/ @@ -210,49 +212,84 @@ public void download(BackupRestoreContext ctx) throws IOException { ctx.getExternalLocation(), containerName); return; } - String keyPrefix = String.format("%s/%s", backupName, nodeId); - final Map snapshotFileKeys = getSnapshotFileKeys(container, keyPrefix); - LOGGER.info("Snapshot files for this node: {}", snapshotFileKeys); - - for (String fileKey : snapshotFileKeys.keySet()) { - downloadFile(localLocation, container, fileKey, snapshotFileKeys.get(fileKey)); + if (Objects.equals(ctx.getRestoreType(), new String("new"))) { + final String keyPrefix = String.format("%s/%s", backupName, nodeId); + final Map snapshotFileKeys = getSnapshotFileKeys(container, keyPrefix); + LOGGER.info("Snapshot files for this node: {}", snapshotFileKeys); + for (String fileKey : snapshotFileKeys.keySet()) { + downloadFile(container, fileKey, snapshotFileKeys.get(fileKey), localLocation + File.separator + fileKey); + } + } else { + for (File keyspace : keyspaces) { + for (File cfDir : getColumnFamilyDir(keyspace)) { + final String columnFamily = cfDir.getName().substring(0, cfDir.getName().indexOf("-")); + final Map snapshotFileKeys = getSnapshotFileKeys(container, + backupName + "/" + nodeId + "/" + keyspace.getName() + "/" + columnFamily); + for (String fileKey : snapshotFileKeys.keySet()) { + final String destinationFile = cfDir.getAbsolutePath() + fileKey.substring(fileKey.lastIndexOf("/")); + downloadFile(container, fileKey, snapshotFileKeys.get(fileKey), destinationFile); + LOGGER.info("Keyspace {}, Column Family {}, FileKey {}, destination {}", keyspace, columnFamily, fileKey, destinationFile); + } + } + } } } - private void downloadFile(String localLocation, CloudBlobContainer container, String fileKey, long originalSize) { - - LOGGER.info("Downloading | Local location {} | fileKey: {} | Size: {}", localLocation, fileKey, originalSize); + private void downloadFile(CloudBlobContainer container, + String fileKey, + long originalSize, + String destinationFile) throws Exception { + try { + final File snapshotFile = new File(destinationFile); + // Only create parent directory once, if it doesn't exist. + final File parentDir = new File(snapshotFile.getParent()); + if (!parentDir.isDirectory()) { + final boolean parentDirCreated = parentDir.mkdirs(); + if (!parentDirCreated) { + LOGGER.error( + "Error creating parent directory for file: {}. Skipping to next", + destinationFile); + return; + } + } - final String fileLocation = localLocation + File.separator + fileKey; - File file = new File(fileLocation); - // Only create parent directory once, if it doesn't exist. - if (!createParentDir(file)) { - LOGGER.error("Unable to create parent directories!"); - return; - } + snapshotFile.createNewFile(); - InputStream inputStream = null; - SnappyInputStream compress = null; + InputStream inputStream = null; + SnappyInputStream compress = null; - try ( - FileOutputStream fileOutputStream = new FileOutputStream(file, true); - BufferedOutputStream bos = new BufferedOutputStream(fileOutputStream)) { + try (FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile, true); + BufferedOutputStream bos = new BufferedOutputStream(fileOutputStream)) { - final CloudPageBlob pageBlobReference = container.getPageBlobReference(fileKey); - inputStream = new PageBlobInputStream(pageBlobReference); - compress = new SnappyInputStream(inputStream); - - IOUtils.copy(compress, bos, DEFAULT_PART_SIZE_DOWNLOAD); + final CloudPageBlob pageBlobReference = container.getPageBlobReference(fileKey); + inputStream = new PageBlobInputStream(pageBlobReference); + compress = new SnappyInputStream(inputStream); + IOUtils.copy(compress, bos, DEFAULT_PART_SIZE_DOWNLOAD); + } finally { + IOUtils.closeQuietly(compress); + IOUtils.closeQuietly(inputStream); + } } catch (Exception e) { - LOGGER.error("Unable to write file: {}", fileKey, e); - } finally { - IOUtils.closeQuietly(compress); - IOUtils.closeQuietly(inputStream); + LOGGER.error("Error downloading the file {} : {}", destinationFile, e); + throw new Exception(e); } } + private File[] getNonSystemKeyspaces(BackupRestoreContext ctx) { + File file = new File(ctx.getLocalLocation()); + File[] directories = file.listFiles( + (current, name) -> new File(current, name).isDirectory() && + name.compareTo("system") != 0); + return directories; + } + + private static File[] getColumnFamilyDir(File keyspace) { + return keyspace.listFiles( + (current, name) -> new File(current, name).isDirectory()); + } + @Override public String downloadSchema(BackupRestoreContext ctx) throws Exception { final String accountName = ctx.getAccountId(); @@ -317,18 +354,6 @@ private CloudBlobContainer getCloudBlobContainer(String accountName, String acco return container; } - private boolean createParentDir(File file) { - final File parentDir = new File(file.getParent()); - if (!parentDir.isDirectory()) { - final boolean parentDirCreated = parentDir.mkdirs(); - if (!parentDirCreated) { - LOGGER.error("Error creating parent directory for file: {}. Skipping to next"); - return false; - } - } - return true; - } - private Map getSnapshotFileKeys(CloudBlobContainer container, String keyPrefix) { Map snapshotFiles = new HashMap<>();