Skip to content

Commit

Permalink
archiving
Browse files Browse the repository at this point in the history
- removed 7zip support, it was always a poor fit, the 7z executable works on a dir level while Tetrifact handles files.
- hardened the archive generation pipeline so it no longer depends on memcached progress objects being present
  • Loading branch information
shukriadams committed Oct 16, 2024
1 parent 61dee05 commit 89ebc6f
Show file tree
Hide file tree
Showing 20 changed files with 67 additions and 327 deletions.
2 changes: 0 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ RUN apt-get update \
&& apt-get install apt-transport-https -y \
&& apt-get update \
&& apt-get install aspnetcore-runtime-6.0 -y \
&& apt-get install p7zip-full -y \
## clean up
&& rm packages-microsoft-prod.deb \
&& apt-get remove wget -y \
Expand All @@ -28,7 +27,6 @@ USER tetrifact
# set Tetrifact default log level
ENV LOGGING__LOGLEVEL__DEFAULT=Information
ENV LOGGING__LOGLEVEL__Microsoft=Warning
ENV SEVEN_ZIP_BINARY_PATH=/usr/lib/p7zip/7z
ENV ASPNETCORE_URLS=http://*:5000

CMD sh -c 'cd /var/tetrifact && dotnet Tetrifact.Web.dll'
1 change: 0 additions & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ services:
environment:
# This argument is REQUIRED for Tetrifact to bind properly to the container host port.
ASPNETCORE_URLS : http://*:5000
SEVEN_ZIP_BINARY_PATH: /usr/lib/p7zip/7z
volumes:
- ./data:/var/tetrifact/data/:rw
ports:
Expand Down
9 changes: 0 additions & 9 deletions docs/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,3 @@ A detailed list of settings are :

(TBD)

#### 7zip

Tetrifact supports 7zip as a compression method for improved performance. Requires setting properies

SevenZipBinaryPath : <path to 7za executable>
DownloadArchiveMode : 7Zip

7za is the only part of 7zip that is required.

271 changes: 47 additions & 224 deletions src/Tetrifact.Core/ArchiveService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public string GetPackageArchivePath(string packageId)

public string GetPackageArchiveQueuePath(string packageId)
{
return Path.Combine(_settings.ArchiveQueuePath, $"{packageId}.json");
// queue files partitioned by iso date to make string sorting easier.
return Path.Combine(_settings.ArchiveQueuePath, $"{DateTime.UtcNow.ToIsoFSFriendly()}_{packageId}.json");
}

public string GetPackageArchiveTempPath(string packageId)
Expand Down Expand Up @@ -142,6 +143,9 @@ public ArchiveProgressInfo GetPackageArchiveStatus(string packageId)

string progressCacheKey = this.GetArchiveProgressKey(packageId);
ArchiveProgressInfo cachedProgress = _cache.Get<ArchiveProgressInfo>(progressCacheKey);
if (cachedProgress == null)
cachedProgress = new ArchiveProgressInfo();

return cachedProgress;
}

Expand Down Expand Up @@ -177,113 +181,6 @@ public void PurgeOldArchives()
}
}

private async Task Archive7Zip(string packageId, string archivePathTemp)
{
// create staging directory
string tempDir1 = Path.Join(_settings.TempPath, $"__repack_{packageId}");
string tempDir2 = Path.Join(_settings.TempPath, $"_repack_{packageId}");

const int bufSize = 6024;

Manifest manifest = _indexReader.GetManifest(packageId);

// copy all files to single Directory
if (!Directory.Exists(tempDir2))
{
_log.LogInformation($"Archive generation : gathering files for package {packageId}");
Directory.CreateDirectory(tempDir1);
long cacheUpdateIncrements = manifest.Files.Count / 100;
long counter = 0;

manifest.Files.AsParallel().WithDegreeOfParallelism(_settings.ArchiveCPUThreads).ForAll(delegate (ManifestItem file)
{
string targetPath = Path.Join(tempDir1, file.Path);
List<string> knownDirectories = new List<string>();
if (manifest.IsCompressed)
{
GetFileResponse fileLookup = _indexReader.GetFile(file.Id);
if (fileLookup == null)
throw new Exception($"Failed to find expected package file {file.Id} - repository is likely corrupt");

using (var storageArchive = new ZipArchive(fileLookup.Content))
{
ZipArchiveEntry storageArchiveEntry = storageArchive.Entries[0];
using (var storageArchiveStream = storageArchiveEntry.Open())
using (FileStream writeStream = new FileStream(targetPath, FileMode.Create))
// copy async not used here because cannot get this delegate to block asParallel,
StreamsHelper.Copy(storageArchiveStream, writeStream, bufSize);
}
}
else
{
GetFileResponse fileLookup = _indexReader.GetFile(file.Id);
if (fileLookup == null)
throw new Exception($"Failed to find expected package file {file.Id}- repository is likely corrupt");

string dir = Path.GetDirectoryName(targetPath);
if (!knownDirectories.Contains(dir))
{
Directory.CreateDirectory(dir);
knownDirectories.Add(dir);
}

// is this the fastest way of copying? benchmark
using (Stream fileStream = fileLookup.Content)
using (FileStream writeStream = new FileStream(targetPath, FileMode.Create))
// copy async not used here because cannot get this delegate to block asParallel,
StreamsHelper.Copy(fileStream, writeStream, bufSize);
}

counter++;

if (cacheUpdateIncrements == 0 || counter % cacheUpdateIncrements == 0)
{
_log.LogInformation($"Gathering file {counter}/{manifest.Files.Count}, package \"{packageId}\".");
string progressCacheKey = this.GetArchiveProgressKey(packageId);
ArchiveProgressInfo progress = _cache.Get<ArchiveProgressInfo>(progressCacheKey);
if (progress != null)
{
progress.FileCopyProgress = ((decimal)counter / (decimal)manifest.Files.Count) * 100;
_cache.Set(progressCacheKey, progress);
}
}
});

Directory.Move(tempDir1, tempDir2);
}

_log.LogInformation($"Archive generation : building archive for package {packageId}");

// force delete temp file if it already exists, this can sometimes fail and we want an exception to be thrown to block 7zip being called.
// if 7zip encounted
if (_fileSystem.File.Exists(archivePathTemp))
_fileSystem.File.Delete(archivePathTemp);

DateTime compressStart = DateTime.Now;

// ensure bin path exists
if (!_fileSystem.File.Exists(_settings.ExternaArchivingExecutable))
throw new Exception($"7zip binary not found at specified path \"{_settings.ExternaArchivingExecutable}\".");

_log.LogInformation($"Invoking 7z archive generation for package \"{packageId}\".");

// -aoa swtich forces overwriting of existing zip file should it exist
string command = $"{_settings.ExternaArchivingExecutable} -aoa a -tzip -mx={_settings.ArchiveCPUThreads} -mmt=on {archivePathTemp} {tempDir2}/*";
ShellResult result = Shell.Run(command, false, 3600000); // set timeout to 1 hour
TimeSpan compressTaken = DateTime.Now - compressStart;

if (result.ExitCode == 0)
{
_log.LogInformation($"Archive comression with 7zip complete, took {Math.Round(compressTaken.TotalSeconds, 0)} seconds.");
if (result.StdErr.Any())
_log.LogError($"Archive comression with 7zip succeeded, but with errors. Took {Math.Round(compressTaken.TotalSeconds, 0)} seconds. {string.Join("", result.StdErr)}");
}
else
{
_log.LogError($"Archive comression with 7zip failed, took {Math.Round(compressTaken.TotalSeconds, 0)} seconds. {string.Join("", result.StdErr)}");
}
}

private async Task ArchiveDotNetZip(string packageId, string archivePathTemp)
{
DateTime compressStart = DateTime.Now;
Expand Down Expand Up @@ -334,7 +231,7 @@ private async Task ArchiveDotNetZip(string packageId, string archivePathTemp)
}

TimeSpan compressTaken = DateTime.Now - compressStart;
_log.LogInformation($"Archive comression with default dotnet ZipArchive complete, took {Math.Round(compressTaken.TotalSeconds, 0)} seconds.");
_log.LogInformation($"Archive compression with default dotnet ZipArchive complete, took {Math.Round(compressTaken.TotalSeconds, 0)} seconds.");
}

public async Task CreateNextQueuedArchive()
Expand All @@ -343,59 +240,52 @@ public async Task CreateNextQueuedArchive()
string progressCacheKey = null;
ArchiveProgressInfo progress = null;

foreach (string queuedFile in _fileSystem.Directory.GetFiles(_settings.ArchiveQueuePath))
{
_log.LogInformation($"Processing archive generation for \"{queuedFile}\".");
string queueFileContent = string.Empty;
try
{
queueFileContent = _fileSystem.File.ReadAllText(queuedFile);
archiveQueueInfo = JsonConvert.DeserializeObject<ArchiveQueueInfo>(queueFileContent);
}
catch (Exception ex)
{
_log.LogError($"Corrupt queue file {queuedFile}, content is \n\n{queueFileContent}\n\n. Error is: {ex}. Force deleting queued file.");
try
{
_fileSystem.File.Delete(queuedFile);
}
catch (Exception ex2)
{
_log.LogError($"Failed to delete corrupt queue file {queuedFile}. Error is: {ex2}.");
}
continue;
}

progressCacheKey = this.GetArchiveProgressKey(archiveQueueInfo.PackageId);
progress = _cache.Get<ArchiveProgressInfo>(progressCacheKey);
if (progress == null)
{
_log.LogError($"Progress object not found for archive generation package {archiveQueueInfo.PackageId}, this should not happen.");
continue;
}

if (progress.State == PackageArchiveCreationStates.Queued)
break;
else
{
// force null, this var is used as flag to determine if we have anything to process
progress = null;
continue;
}
}

// nothing queued, exit normally
if (progress == null)
return;

string queuedFile = _fileSystem.Directory.GetFiles(_settings.ArchiveQueuePath).OrderByDescending(f => f).FirstOrDefault();
if (queuedFile == null)
return;

_log.LogInformation($"Processing archive generation for \"{queuedFile}\".");
string queueFileContent = string.Empty;

try
{
queueFileContent = _fileSystem.File.ReadAllText(queuedFile);
archiveQueueInfo = JsonConvert.DeserializeObject<ArchiveQueueInfo>(queueFileContent);
}
catch (Exception ex)
{
_log.LogError($"Corrupt queue file {queuedFile}, content is \n\n{queueFileContent}\n\n. Error is: {ex}. Force deleting queued file.");
try
{
_fileSystem.File.Delete(queuedFile);
}
catch (Exception ex2)
{
_log.LogError($"Failed to delete corrupt queue file {queuedFile}. Error is: {ex2}.");
}
return;
}

progressCacheKey = this.GetArchiveProgressKey(archiveQueueInfo.PackageId);
progress = _cache.Get<ArchiveProgressInfo>(progressCacheKey);
if (progress == null)
progress = new ArchiveProgressInfo
{
PackageId = archiveQueueInfo.PackageId,
QueuedUtc = archiveQueueInfo.QueuedUtc
};

progress.State = PackageArchiveCreationStates.ArchiveGenerating;
progress.StartedUtc = DateTime.UtcNow;
_cache.Set(progressCacheKey, progress);

await this.CreateArchive(archiveQueueInfo.PackageId);

progress.State = PackageArchiveCreationStates.Processed_CleanupRequired;
_cache.Set(progressCacheKey, progress);
_cache.Set(progressCacheKey, progress);

// finally, cleanup queue file
_fileSystem.File.Delete(queuedFile);
}

public async Task CreateArchive(string packageId)
Expand Down Expand Up @@ -424,10 +314,7 @@ public async Task CreateArchive(string packageId)

try
{
if (_settings.ArchivingMode == ArchivingModes.SevenZip)
await Archive7Zip(packageId, archivePathTemp);
else
await ArchiveDotNetZip(packageId, archivePathTemp);
await ArchiveDotNetZip(packageId, archivePathTemp);

// flip temp file to final path, it is ready for use only when this happens
_fileSystem.File.Move(archivePathTemp, archivePath);
Expand All @@ -436,78 +323,14 @@ public async Task CreateArchive(string packageId)
}
catch(Exception ex)
{
_log.LogError($"Package archive for {packageId} failed unexpectedly with {ex}");
_log.LogError($"Package archive for {packageId} failed unexpectedly with {ex}.");
}
finally
{
_lock.Unlock(archivePathTemp);
}
}

public void CleanupNextQueuedArchive()
{
ArchiveQueueInfo archiveQueueInfo = null;
string progressKey = null;
ArchiveProgressInfo progress = null;
string queueFile = null;

foreach (string queuedFile in _fileSystem.Directory.GetFiles(_settings.ArchiveQueuePath))
{
queueFile = queuedFile;

string queueFileContent = string.Empty;
try
{
queueFileContent = _fileSystem.File.ReadAllText(queuedFile);
archiveQueueInfo = JsonConvert.DeserializeObject<ArchiveQueueInfo>(queueFileContent);
}
catch (Exception ex)
{
_log.LogError($"Corrupt queue file {queuedFile}, content is \n\n{queueFileContent}\n\n. Error is: {ex}. Force deleting queued file.");
try
{
_fileSystem.File.Delete(queuedFile);
}
catch (Exception ex2)
{
_log.LogError($"Failed to delete corrupt queue file {queuedFile}. Error is: {ex2}.");
}
continue;
}

progressKey = this.GetArchiveProgressKey(archiveQueueInfo.PackageId);
progress = _cache.Get<ArchiveProgressInfo>(progressKey);
if (progress == null)
{
_log.LogError($"Progress object not found for archive generation package {archiveQueueInfo.PackageId}, this should not happen.");
continue;
}

if (progress.State == PackageArchiveCreationStates.Processed_CleanupRequired)
break;
else
{
// force null, this var is used as flag to determine if we have anything to process
progress = null;
continue;
}
}

// nothing queued, exit normally
if (progress == null)
return;

// cleanup
string tempDir2 = Path.Join(_settings.TempPath, $"_repack_{archiveQueueInfo.PackageId}");
if (_fileSystem.Directory.Exists(tempDir2))
_fileSystem.Directory.Delete(tempDir2, true);

if (_fileSystem.File.Exists(queueFile))
_fileSystem.File.Delete(queueFile);

_cache.Remove(progressKey);
}

#endregion
}
}
3 changes: 1 addition & 2 deletions src/Tetrifact.Core/ArchivingModes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{
public enum ArchivingModes
{
Default, // internal dotnet zip compressio
SevenZip // seven zip. requires external 7zip binary. 7zip support is still experimental.
Default // internal dotnet zip compressio
}
}
Loading

0 comments on commit 89ebc6f

Please sign in to comment.