Skip to content

Commit

Permalink
feat: can upload to git lfs
Browse files Browse the repository at this point in the history
  • Loading branch information
ccrutchf committed Nov 30, 2024
1 parent 09f35e4 commit 7485110
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 171 deletions.
1 change: 0 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
"request": "launch",
"name": "Launch",
"program": "${workspaceFolder}/target/debug/git-lfs-synology",
"cwd": "${workspaceFolder}/../git-lfs-test"
}
]
}
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ aead = { version = "0.5.2", features = ["std"] }
aes-gcm = "0.10.3"
anyhow = "1.0.93"
app_dirs2 = "2.5.5"
async-stream = "0.3.6"
clap = "4.5.21"
educe = "0.6.0"
futures-macro = "0.3.31"
Expand All @@ -18,12 +17,11 @@ keyring = { version = "3.6.1", features = ["apple-native", "windows-native", "sy
named-lock = "0.4.1"
num-derive = "0.4.2"
num-traits = "0.2.19"
reqwest = { version = "0.12.9", features = ["json", "stream"] }
reqwest = { version = "0.12.9", features = ["stream", "multipart"] }
rpassword = "7.3.1"
rusqlite = { version = "0.32.1", features = ["bundled"] }
serde = { version = "1.0.215", features = ["derive"] }
serde_json = "1.0.133"
tempfile = "3.14.0"
thiserror = "2.0.3"
tokio = { version = "1.41.1", features = ["fs", "macros", "rt", "rt-multi-thread"] }
tokio-util = { version = "0.7.12", features = ["io"] }
Expand All @@ -32,4 +30,3 @@ tracing-appender = "0.2.3"
tracing-subscriber = "0.3.18"
url = "2.5.4"
urlencoding = "2.1.3"
zstd = "0.13.2"
46 changes: 31 additions & 15 deletions src/git_lfs/git_lfs_parser.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::io;

use anyhow::{bail, Result};
use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
use tracing::{info, warn, error};

use super::CustomTransferAgent;

Expand All @@ -26,18 +26,14 @@ impl GitLfsProgressReporter {
}
}

pub fn update(&mut self, progress: f64) -> Result<()> {
let progress_bytes = if progress < 1.0 {
(self.total_bytes as f64 * progress).floor() as usize
}
else {
self.total_bytes
};
pub fn update(&mut self, bytes_since_last: usize) -> Result<()> {
self.bytes_since_last = bytes_since_last;
self.bytes_so_far += bytes_since_last;

self.bytes_since_last = progress_bytes - self.bytes_so_far;
self.bytes_so_far = progress_bytes;
let progress_json = serde_json::to_string(self)?;

println!("{}", serde_json::to_string(self)?);
info!("Reporting progress: \"{}\".", progress_json);
println!("{}", progress_json);
Ok(())
}
}
Expand All @@ -50,7 +46,25 @@ pub fn error_init(code: u32, message: &str) -> Result<()> {
}
};

println!("{}", serde_json::to_string(&error_json)?);
let error_json = serde_json::to_string(&error_json)?;

error!("Reporting error: \"{}\".", error_json);
println!("{}", error_json);
Ok(())
}

pub fn complete_upload(oid: &str) -> Result<()> {
let complete_json = EventJson {
event: "complete".to_string(),
oid: Some(oid.to_string()),
path: None,
size: None
};

let complete_json = serde_json::to_string(&complete_json)?;

info!("Reporting complete: \"{}\".", complete_json);
println!("{}", complete_json);
Ok(())
}

Expand Down Expand Up @@ -163,11 +177,13 @@ impl<'custom_transfer_agent, T: CustomTransferAgent> GitLfsParser<'custom_transf
match event.event {
EventType::Download => {
info!("Calling download on custom transfer agent.");
self.custom_transfer_agent.download(&event).await?
self.custom_transfer_agent.download(&event).await?;
},
EventType::Upload => {
info!("Calling upload on custom transfer agent.");
self.custom_transfer_agent.upload(&event).await?
self.custom_transfer_agent.upload(&event).await?;

complete_upload(event.oid.context("OID should not be null")?.as_str())?;
},
EventType::Terminate => {
info!("Calling terminate on custom transfer agent.");
Expand Down
7 changes: 4 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use clap::{Arg, Command};
use tracing::error;
use tracing::{error, info};
use users_dirs::get_config_dir;
use tracing_appender::rolling;
use tracing_subscriber::fmt::writer::MakeWriterExt;
Expand Down Expand Up @@ -103,11 +103,12 @@ async fn main() -> Result<()> {
}
};

info!("Exiting...");

match result {
Ok(_) => Ok(()),
Err(error) => {
error!("An error bubbled to the main method: \"{}\".", error);

error!("An error bubbled to the main method: \"{:?}\".", error);
Err(error)
}
}
Expand Down
91 changes: 13 additions & 78 deletions src/subcommands/main_subcommand.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
use std::{fs::File, io::{Read, Write}};

use anyhow::{Context, Result};
use clap::ArgMatches;
use named_lock::NamedLock;
use tracing::info;
use zstd::Encoder;

use crate::{configuration::Configuration, credential_manager::CredentialManager, git_lfs::{error_init, CustomTransferAgent, Event, GitLfsParser, GitLfsProgressReporter}, synology_api::{ProgressReporter, SynologyFileStation}};

use super::Subcommand;

#[derive(Debug)]
struct StdOutProgressReporter {
git_lfs_progress_reporter: GitLfsProgressReporter,
total_bytes: usize
git_lfs_progress_reporter: GitLfsProgressReporter
}

impl ProgressReporter for StdOutProgressReporter {
fn update(&mut self, bytes_so_far: usize) -> Result<()> {
let progress = 0.9 * bytes_so_far as f64 / self.total_bytes as f64;
self.git_lfs_progress_reporter.update(progress)
fn update(&mut self, bytes_since_last: usize) -> Result<()> {
self.git_lfs_progress_reporter.update(bytes_since_last)
}
}

Expand Down Expand Up @@ -73,34 +69,22 @@ impl CustomTransferAgent for MainSubcommand {
async fn upload(&mut self, event: &Event) -> Result<()> {
let configuration = Configuration::load()?;

let mut git_lfs_progress_reporter = GitLfsProgressReporter::new(
let git_lfs_progress_reporter = GitLfsProgressReporter::new(
event.size.clone().context("Size should not be null")?,
event.oid.clone().context("oid should not be null")?);

let path = event.path.clone().context("Path should not be null.")?;
let (compressed, file) = self.compress_file(path.as_str(), event.size.context("Size should not be null")?, &mut git_lfs_progress_reporter)?;
let source_path = event.path.clone().context("Path should not be null.")?;
info!("Preparing to upload file at \"{}\".", source_path);
info!("Pushing to server path: \"{}\".", configuration.path);

let target_file_name = if compressed {
format!("{}.zst", event.oid.clone().context("oid should not be null")?)
}
else {
event.oid.clone().context("oid should not be null")?
let progress_reporter = StdOutProgressReporter {
git_lfs_progress_reporter
};

let path = format!(
"{}/{}",
configuration.path,
target_file_name
);

// let mut progress_reporter = StdOutProgressReporter {
// git_lfs_progress_reporter
// };

// let file_station = self.file_station.clone().context("File Station should not be null")?;
// file_station.upload(file, path.as_str(), false, false, None, None, None, Some(&mut progress_reporter)).await?;
// Upload either the uncompressed blob or the original to the nas - 90%
let file_station = self.file_station.clone().context("File Station should not be null")?;
file_station.upload(source_path.as_str(), event.size.clone().context("Size should not be null")?, configuration.path.as_str(), false, false, None, None, None, Some(progress_reporter)).await?;

info!("Upload finished.");
Ok(())
}
}
Expand All @@ -123,55 +107,6 @@ impl MainSubcommand {
}
}

#[tracing::instrument]
fn compress_file(&self, path: &str, size: usize, progress_reporter: &mut GitLfsProgressReporter) -> Result<(bool, File)> {
const BYTES_TO_KB: usize = 1024;
const KB_TO_MB: usize = 1024;
const BYTES_TO_MB: usize = BYTES_TO_KB * KB_TO_MB;
const CHUNK_SIZE: usize = 4 * BYTES_TO_MB;

let chunk_count = (size as f64 / CHUNK_SIZE as f64).ceil() as u64;

info!("Compressing file. We have {} chunks.", chunk_count);

let mut source = File::open(path)?;

let target = tempfile::tempfile()?;
let mut encoder = Encoder::new(&target, 0)?;

let mut compressible = true;
let mut buffer = [0; CHUNK_SIZE];
for i in 0..chunk_count {
let count = source.read(&mut buffer)?;
let compressed_size = encoder.write(&buffer[..count])?;

if i == 0 && compressed_size < count {
info!("File is not compressible, aborting compression.");

compressible = false;
break // We are not compressible
}

let progress = 0.1 * (i + 1) as f64 / chunk_count as f64;
progress_reporter.update(progress)?;
}

let progress = 1.0 / 10.0;
progress_reporter.update(progress)?;

if compressible {
info!("Finished compressing.");

encoder.finish()?;
Ok((compressible, target))
}
else {
info!("Compression is not possible");

Ok((compressible, source))
}
}

#[tracing::instrument]
async fn create_folder(&self, path: &str) -> Result<()> {
let configuration = Configuration::load()?;
Expand Down
Loading

0 comments on commit 7485110

Please sign in to comment.