Skip to content

Commit

Permalink
linting
Browse files Browse the repository at this point in the history
  • Loading branch information
tkchafin committed Jul 12, 2024
1 parent 9937493 commit 6d0763e
Showing 1 changed file with 43 additions and 106 deletions.
149 changes: 43 additions & 106 deletions bin/create_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,18 @@


def parse_args(args=None):
Description = (
"Create a table by parsing json output to extract N50, "
"BUSCO, QV and COMPLETENESS stats."
)
Description = "Create a table by parsing json output to extract N50, " "BUSCO, QV and COMPLETENESS stats."

parser = argparse.ArgumentParser(description=Description)
parser.add_argument(
"--genome", required=True,
help="Input NCBI genome summary JSON file."
)
parser.add_argument(
"--sequence", required=True,
help="Input NCBI sequence summary JSON file."
)
parser.add_argument(
"--busco", help="Input BUSCO short summary JSON file."
)
parser.add_argument(
"--qv", nargs="*", help="Input QV TSV file from MERQURYFK."
)
parser.add_argument(
"--completeness", nargs="*",
help="Input COMPLETENESS stats TSV file from MERQURYFK."
)
parser.add_argument(
"--hic", action="append", help="HiC sample ID used for contact maps."
)
parser.add_argument(
"--flagstat", action="append",
help="HiC flagstat file created by Samtools."
)
parser.add_argument(
"--outcsv", required=True, help="Output CSV file."
)
parser.add_argument(
"--version", action="version", version="%(prog)s 3.1"
)
parser.add_argument("--genome", required=True, help="Input NCBI genome summary JSON file.")
parser.add_argument("--sequence", required=True, help="Input NCBI sequence summary JSON file.")
parser.add_argument("--busco", help="Input BUSCO short summary JSON file.")
parser.add_argument("--qv", nargs="*", help="Input QV TSV file from MERQURYFK.")
parser.add_argument("--completeness", nargs="*", help="Input COMPLETENESS stats TSV file from MERQURYFK.")
parser.add_argument("--hic", action="append", help="HiC sample ID used for contact maps.")
parser.add_argument("--flagstat", action="append", help="HiC flagstat file created by Samtools.")
parser.add_argument("--outcsv", required=True, help="Output CSV file.")
parser.add_argument("--version", action="version", version="%(prog)s 3.1")
return parser.parse_args(args)


Expand Down Expand Up @@ -103,81 +78,55 @@ def ncbi_stats(genome_in, seq_in, writer):
writer.writerow(["Accession", data.get("accession", math.nan)])
writer.writerow(["Common_Name", organism.get("common_name", math.nan)])
writer.writerow(["Organism_Name", organism.get("organism_name", math.nan)])
tol_id = "".join(
pairs.get("value", "")
for pairs in attr if pairs.get("name") == "tolid"
)
tol_id = "".join(pairs.get("value", "") for pairs in attr if pairs.get("name") == "tolid")
writer.writerow(["ToL_ID", tol_id if tol_id else math.nan])
writer.writerow(["Taxon_ID", organism.get("tax_id", math.nan)])
writer.writerow(["Assembly_Name", info.get("assembly_name", math.nan)])
writer.writerow(["Assembly_Level", info.get("assembly_level", math.nan)])
life_stage = "".join(
pairs.get("value", "")
for pairs in attr if pairs.get("name") == "life_stage"
)
life_stage = "".join(pairs.get("value", "") for pairs in attr if pairs.get("name") == "life_stage")
writer.writerow(["Life_Stage", life_stage if life_stage else math.nan])
tissue = "".join(
pairs.get("value", "")
for pairs in attr if pairs.get("name") == "tissue"
)
tissue = "".join(pairs.get("value", "") for pairs in attr if pairs.get("name") == "tissue")
writer.writerow(["Tissue", tissue if tissue else math.nan])
sex = "".join(
pairs.get("value", "")
for pairs in attr if pairs.get("name") == "sex"
)
sex = "".join(pairs.get("value", "") for pairs in attr if pairs.get("name") == "sex")
writer.writerow(["Sex", sex if sex else math.nan])

# Write assembly statistics
writer.writerow(["##Assembly_Statistics"])
writer.writerow([
"Total_Sequence", stats.get("total_sequence_length", math.nan)
])
writer.writerow([
"Chromosomes", stats.get("total_number_of_chromosomes", math.nan)
])
writer.writerow([
"Scaffolds", stats.get("number_of_scaffolds", math.nan)
])
writer.writerow([
"Scaffold_N50", stats.get("scaffold_n50", math.nan)
])
writer.writerow([
"Contigs", stats.get("number_of_contigs", math.nan)
])
writer.writerow([
"Contig_N50", stats.get("contig_n50", math.nan)
])
writer.writerow([
"GC_Percent", stats.get("gc_percent", math.nan)
])
writer.writerow(["Total_Sequence", stats.get("total_sequence_length", math.nan)])
writer.writerow(["Chromosomes", stats.get("total_number_of_chromosomes", math.nan)])
writer.writerow(["Scaffolds", stats.get("number_of_scaffolds", math.nan)])
writer.writerow(["Scaffold_N50", stats.get("scaffold_n50", math.nan)])
writer.writerow(["Contigs", stats.get("number_of_contigs", math.nan)])
writer.writerow(["Contig_N50", stats.get("contig_n50", math.nan)])
writer.writerow(["GC_Percent", stats.get("gc_percent", math.nan)])

chromosome_header = False
for mol in seq:
if mol.get("gc_percent") is not None and \
mol.get("assembly_unit") != "non-nuclear":
if mol.get("gc_percent") is not None and mol.get("assembly_unit") != "non-nuclear":
if not chromosome_header:
writer.writerow(["##Chromosome", "Length", "GC_Percent"])
chromosome_header = True
writer.writerow([
mol.get("chr_name", math.nan),
round(mol.get("length", 0) / 1000000, 2)
if mol.get("length") is not None else math.nan,
mol.get("gc_percent", math.nan),
])
writer.writerow(
[
mol.get("chr_name", math.nan),
round(mol.get("length", 0) / 1000000, 2) if mol.get("length") is not None else math.nan,
mol.get("gc_percent", math.nan),
]
)

organelle_header = False
for mol in seq:
if mol.get("gc_percent") is not None and \
mol.get("assembly_unit") == "non-nuclear":
if mol.get("gc_percent") is not None and mol.get("assembly_unit") == "non-nuclear":
if not organelle_header:
writer.writerow(["##Organelle", "Length", "GC_Percent"])
organelle_header = True
writer.writerow([
mol.get("assigned_molecule_location_type", math.nan),
round(mol.get("length", 0) / 1000000, 2)
if mol.get("length") is not None else math.nan,
mol.get("gc_percent", math.nan),
])
writer.writerow(
[
mol.get("assigned_molecule_location_type", math.nan),
round(mol.get("length", 0) / 1000000, 2) if mol.get("length") is not None else math.nan,
mol.get("gc_percent", math.nan),
]
)


def extract_busco(file_in, writer):
Expand All @@ -191,9 +140,7 @@ def extract_busco(file_in, writer):
with open(file_in, "r") as fin:
data = json.load(fin)

lineage_dataset_name = data.get(
"lineage_dataset", {}
).get("name", math.nan)
lineage_dataset_name = data.get("lineage_dataset", {}).get("name", math.nan)
results_summary = data.get("results", {}).get("one_line_summary", math.nan)

writer.writerow(["##BUSCO", lineage_dataset_name])
Expand All @@ -220,32 +167,24 @@ def extract_pacbio(qv, completeness, writer):
for row in data:
if float(row["QV"]) > qval:
qval = float(row["QV"])
qv_name = remove_sample_T_suffix(
os.path.basename(f).removesuffix(".qv")
)
qv_name = remove_sample_T_suffix(os.path.basename(f).removesuffix(".qv"))
assert qv_name is not None, "No QV values found in %s" % qv

# The completeness has to be from the same specimen as the QV value
matching_completeness_files = []
for h in completeness:
comp_name = remove_sample_T_suffix(
os.path.basename(h).removesuffix(".completeness.stats")
)
comp_name = remove_sample_T_suffix(os.path.basename(h).removesuffix(".completeness.stats"))
if comp_name == qv_name:
matching_completeness_files.append(h)
assert matching_completeness_files, (
"No completeness files (%s) match for %s" % (completeness, qv_name)
)
assert matching_completeness_files, "No completeness files (%s) match for %s" % (completeness, qv_name)

comp = None
for h in matching_completeness_files:
with open(h, "r") as fin:
data = csv.DictReader(fin, delimiter="\t")
for row in data:
comp = float(row["% Covered"])
assert comp is not None, (
"No completeness values found in %s" % matching_completeness_files
)
assert comp is not None, "No completeness values found in %s" % matching_completeness_files

writer.writerow(["##MerquryFK", qv_name])
writer.writerow(["QV", qval])
Expand All @@ -266,9 +205,7 @@ def extract_mapped(sample, file_in, writer):
with open(file_in, "r") as fin:
for line in fin:
if "primary mapped" in line:
writer.writerow(
["Primary_Mapped", re.search(r"\((.*?) :", line).group(1)]
)
writer.writerow(["Primary_Mapped", re.search(r"\((.*?) :", line).group(1)])


def main(args=None):
Expand Down

0 comments on commit 6d0763e

Please sign in to comment.