Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added error handling to multiple functions in R/assign_job_queue.R, R/blastWrappers.R #76

Merged
merged 8 commits into from
Oct 30, 2024
267 changes: 200 additions & 67 deletions R/acc2lin.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,28 @@
#' Sink Reset
#'
#' @return No return, but run to close all outstanding `sink()`s
#' and handles any errors or warnings that occur during the process.
#'
#' @export
#'
#' @examples
#' \dontrun{
#' sinkReset()
#' }
sinkReset <- function() {
# Handle all errors and warnings
tryCatch({
for (i in seq_len(sink.number())) {
sink(NULL)
sink(NULL)
}
print("All sinks closed")
}, error = function(e) {
print(paste("Error: ", e$message))
}, warning = function(w) {
print(paste("Warning: ", w$message))
}, finally = {
print("resetSink function execution completed.")
})
}


Expand All @@ -44,39 +56,79 @@ sinkReset <- function() {
#' addLineage()
#' }
addLineage <- function(df, acc_col = "AccNum", assembly_path,
lineagelookup_path, ipgout_path = NULL, plan = "sequential", ...) {
s_acc_col <- sym(acc_col)
accessions <- df %>% pull(acc_col)
lins <- acc2Lineage(accessions, assembly_path, lineagelookup_path, ipgout_path, plan)
lineagelookup_path, ipgout_path = NULL,
plan = "sequential", ...) {
# check for validate inputs
if (!is.data.frame(df)) {
stop("Input 'df' must be a data frame.")
}

if (!acc_col %in% colnames(df)) {
stop(paste("Column", acc_col, "not found in data frame."))
}

# Ensure paths are character strings
if (!is.character(assembly_path) || !is.character(lineagelookup_path)) {
stop("Both 'assembly_path' and
'lineagelookup_path' must be character strings.")
}

# Ensure paths exist
if (!file.exists(assembly_path)) {
stop(paste("Assembly file not found at:", assembly_path))
}

# Drop a lot of the unimportant columns for now? will make merging much easier
lins <- lins[, c(
if (!file.exists(lineagelookup_path)) {
stop(paste("Lineage lookup file not found at:", lineagelookup_path))
}
tryCatch({
# Attempt to add lineages
acc_col <- sym(acc_col)
accessions <- df %>% pull(acc_col)
lins <- acc2Lineage(
accessions, assembly_path, lineagelookup_path, ipgout_path, plan
)

# Drop a lot of the unimportant columns for now?
# will make merging much easier
lins <- lins[, c(
"Strand", "Start", "Stop", "Nucleotide Accession", "Source",
"Id", "Strain"
) := NULL]
lins <- unique(lins)
) := NULL]
lins <- unique(lins)

# dup <- lins %>% group_by(Protein) %>%
# summarize(count = n()) %>% filter(count > 1) %>%
# pull(Protein)

# dup <- lins %>% group_by(Protein) %>% summarize(count = n()) %>% filter(count > 1) %>%
# pull(Protein)
merged <- merge(df, lins, by.x = acc_col, by.y = "Protein", all.x = TRUE)
return(merged)
}, error = function(e) {
print(paste("Error: ", e$message))
}, warning = function(w) {
print(paste("Warning: ", w$message))
}, finally = {
print("addLineages function execution completed.")
})

merged <- merge(df, lins, by.x = acc_col, by.y = "Protein", all.x = TRUE)
return(merged)
}


#' acc2Lineage
#'
#' @author Samuel Chen, Janani Ravi
#'
#' @description This function combines 'efetchIPG()' and 'IPG2Lineage()' to map a set
#' @description This function combines 'efetchIPG()'
#' and 'IPG2Lineage()' to map a set
#' of protein accessions to their assembly (GCA_ID), tax ID, and lineage.
#'
#' @param accessions Character vector of protein accessions
#' @param assembly_path String of the path to the assembly_summary path
#' This file can be generated using the \link[MolEvolvR]{downloadAssemblySummary} function
#' @param lineagelookup_path String of the path to the lineage lookup file
#' (taxid to lineage mapping). This file can be generated using the
#' @param ipgout_path Path to write the results of the efetch run of the accessions
#' @param ipgout_path Path to write the results
#' of the efetch run of the accessions
#' on the ipg database. If NULL, the file will not be written. Defaults to NULL
#' @param plan
#'
Expand All @@ -87,27 +139,43 @@ addLineage <- function(df, acc_col = "AccNum", assembly_path,
#' \dontrun{
#' acc2Lineage()
#' }
acc2Lineage <- function(accessions, assembly_path, lineagelookup_path, ipgout_path = NULL, plan = "sequential", ...) {
tmp_ipg <- F
if (is.null(ipgout_path)) {
tmp_ipg <- T
ipgout_path <- tempfile("ipg", fileext = ".txt")
}
acc2Lineage <- function(accessions, assembly_path,
lineagelookup_path, ipgout_path = NULL,
plan = "sequential", ...) {
tmp_ipg <- F
if (is.null(ipgout_path)) {
tmp_ipg <- T
ipgout_path <- tempfile("ipg", fileext = ".txt")
}

lins <- NULL
tryCatch({
# Attempt to fetch IPG
efetchIPG(accessions, out_path = ipgout_path, plan)

# Attempt to process IPG to lineages
lins <- IPG2Lineage(accessions, ipgout_path, assembly_path, lineagelookup_path)
}, error = function(e) {
print(paste("An error occurred: ", e$message))
}, warning = function(w) {
print(paste("Warning: ", w$message))
}, finally = {
print("acc2lin function execution completed.")
})

if (tmp_ipg) {
unlink(tempdir(), recursive = T)
}
return(lins)
if (tmp_ipg) {
unlink(tempdir(), recursive = T)
}
return(lins)
}


#' efetchIPG
#'
#' @author Samuel Chen, Janani Ravi
#'
#' @description Perform efetch on the ipg database and write the results to out_path
#' @description Perform efetch on the ipg database
#' and write the results to out_path
#'
#' @param accnums Character vector containing the accession numbers to query on
#' the ipg database
Expand All @@ -126,57 +194,84 @@ acc2Lineage <- function(accessions, assembly_path, lineagelookup_path, ipgout_pa
#' efetchIPG()
#' }
efetchIPG <- function(accnums, out_path, plan = "sequential", ...) {
if (length(accnums) > 0) {
partition <- function(in_data, groups) {
# \\TODO This function should be defined outside of efetchIPG(). It can be non-exported/internal
# Partition data to limit number of queries per second for rentrez fetch:
# limit of 10/second w/ key
l <- length(in_data)

partitioned <- list()
for (i in 1:groups)
{
partitioned[[i]] <- in_data[seq.int(i, l, groups)]
}

return(partitioned)
}
# Argument validation
if (!is.character(accnums) || length(accnums) == 0) {
stop("Error: 'accnums' must be a non-empty character vector.")
}

if (!is.character(out_path) || nchar(out_path) == 0) {
stop("Error: 'out_path' must be a non-empty string.")
}

if (!is.function(plan)) {
stop("Error: 'plan' must be a valid plan function.")
}
if (length(accnums) > 0) {
partition <- function(in_data, groups) {
# \\TODO This function should be defined outside of efetchIPG().
# It can be non-exported/internal
# Partition data to limit number of queries per second for rentrez fetch:
# limit of 10/second w/ key
l <- length(in_data)

plan(strategy = plan, .skip = T)


min_groups <- length(accnums) / 200
groups <- min(max(min_groups, 15), length(accnums))
partitioned_acc <- partition(accnums, groups)
sink(out_path)

a <- future_map(1:length(partitioned_acc), function(x) {
# Avoid hitting the rate API limit
if (x %% 9 == 0) {
Sys.sleep(1)
}
cat(
entrez_fetch(
id = partitioned_acc[[x]],
db = "ipg",
rettype = "xml",
api_key = "YOUR_KEY_HERE" ## Can this be included in public package?
)
)
})
sink(NULL)
partitioned <- list()
for (i in 1:groups){
partitioned[[i]] <- in_data[seq.int(i, l, groups)]
}

return(partitioned)
}
tryCatch({
# Set the future plan strategy
plan(strategy = plan, .skip = T)


min_groups <- length(accnums) / 200
groups <- min(max(min_groups, 15), length(accnums))
partitioned_acc <- partition(accnums, groups)

# Open the sink to the output path
sink(out_path)

a <- future_map(1:length(partitioned_acc), function(x) {
# Avoid hitting the rate API limit
if (x %% 9 == 0) {
Sys.sleep(1)
}
cat(
entrez_fetch(
id = partitioned_acc[[x]],
db = "ipg",
rettype = "xml",
api_key = "YOUR_KEY_HERE" ## Can this be included in public package?
)
)
})
sink(NULL)
}, error = function(e) {
print(paste("An error occurred: ", e$message))
}, warning = function(w) {
print(paste("Warning: ", w$message))
}, finally = {
print("efetch_ipg function execution completed.")
})
}
}



#' IPG2Lineage
#'
#' @author Samuel Chen, Janani Ravi
#'
#' @description Takes the resulting file of an efetch run on the ipg database and
#' @description Takes the resulting file
#' of an efetch run on the ipg database and
#'
#' @param accessions Character vector of protein accessions
#' @param ipg_file Filepath to the file containing results of an efetch run on the
#' ipg database. The protein accession in 'accessions' should be contained in this
#' @param ipg_file Filepath to the file
#' containing results of an efetch run on the
#' ipg database. The protein accession in
#' 'accessions' should be contained in this
#' file
#' @param assembly_path String of the path to the assembly_summary path
#' This file can be generated using the \link[MolEvolvR]{downloadAssemblySummary} function
Expand All @@ -195,16 +290,54 @@ efetchIPG <- function(accnums, out_path, plan = "sequential", ...) {
#' }
#'
IPG2Lineage <- function(accessions, ipg_file, assembly_path, lineagelookup_path, ...) {
# Argument validation for accessions
if (!is.character(accessions) || length(accessions) == 0) {
stop("Input 'accessions' must be a non-empty character vector.")
}

# check for validate inputs
if (!is.character(ipg_file)) {
stop("Input 'ipg_file' must be a character string.")
}
# Ensure paths are character strings
if (!is.character(assembly_path) || !is.character(lineagelookup_path)) {
stop("Both 'assembly_path' and
'lineagelookup_path' must be character strings.")
}

# Ensure paths exist
if (!file.exists(assembly_path)) {
stop(paste("Assembly file not found at:", assembly_path))
}

if (!file.exists(lineagelookup_path)) {
stop(paste("Lineage lookup file not found at:", lineagelookup_path))
}

try({
# Attempt to read the IPG file
ipg_dt <- fread(ipg_file, sep = "\t", fill = T)

# Filter the IPG data table to only include the accessions
ipg_dt <- ipg_dt[Protein %in% accessions]

# Rename the 'Assembly' column to 'GCA_ID'
ipg_dt <- setnames(ipg_dt, "Assembly", "GCA_ID")

# Convert the IPG data table to a lineage data table
lins <- GCA2Lineage(prot_data = ipg_dt, assembly_path, lineagelookup_path)

# Filter out rows with missing lineage information
lins <- lins[!is.na(Lineage)] %>% unique()

return(lins)
}, error = function(e) {
print(paste("An error occurred: ", e$message))
}, warning = function(w) {
print(paste("Warning: ", w$message))
}, finally = {
print("ipg2lin function execution completed.")
})
}


Expand Down
Loading
Loading