diff --git a/pyplink/tests/test_pyplink.py b/pyplink/tests/test_pyplink.py index 978a7ca..23a798a 100644 --- a/pyplink/tests/test_pyplink.py +++ b/pyplink/tests/test_pyplink.py @@ -26,16 +26,26 @@ from __future__ import print_function import os +import stat import random import shutil +import zipfile +import platform import unittest from tempfile import mkdtemp +from distutils.spawn import find_executable +from subprocess import check_call, PIPE, CalledProcessError try: from itertools import zip_longest as zip except ImportError: from itertools import izip_longest as zip +try: + from urllib.request import urlretrieve +except ImportError: + from urllib import urlretrieve + from pkg_resources import resource_filename import numpy as np @@ -51,7 +61,7 @@ class TestPyPlink(unittest.TestCase): @classmethod def setUpClass(cls): # Creating a temporary directory - cls.tmp_dir = mkdtemp(prefix="beelinetools_test_") + cls.tmp_dir = mkdtemp(prefix="pyplink_test_") # Getting the BED/BIM/FAM files cls.bed = resource_filename( @@ -548,9 +558,9 @@ def test_write_binary(self): test_prefix = os.path.join(self.tmp_dir, "test_write") # Writing the binary file - with PyPlink(test_prefix, "w") as plink: + with PyPlink(test_prefix, "w") as pedfile: for genotypes in expected_genotypes: - plink.write_marker(genotypes) + pedfile.write_marker(genotypes) # Checking the file exists self.assertTrue(os.path.isfile(test_prefix + ".bed")) @@ -568,8 +578,8 @@ def test_write_binary(self): file=o_file) # Reading the written binary file - plink = PyPlink(test_prefix) - for i, (marker, genotypes) in enumerate(plink): + pedfile = PyPlink(test_prefix) + for i, (marker, genotypes) in enumerate(pedfile): self.assertEqual("m{}".format(i+1), marker) self.assertTrue((expected_genotypes[i] == genotypes).all()) @@ -587,9 +597,9 @@ def test_write_binary_error(self): # Writing the binary file with self.assertRaises(ValueError) as cm: - with PyPlink(test_prefix, "w") as plink: + with PyPlink(test_prefix, "w") as pedfile: for genotypes in expected_genotypes: - plink.write_marker(genotypes) + pedfile.write_marker(genotypes) self.assertEqual("7 samples expected, got 6", str(cm.exception)) def test_grouper_padding(self): @@ -613,3 +623,156 @@ def test_grouper_no_padding(self): observed_chunks = PyPlink._grouper(range(10), 5) for expected, observed in zip(expected_chunks, observed_chunks): self.assertEqual(expected, observed) + + @unittest.skipIf(platform.system() not in {"Darwin", "Linux", "Windows"}, + "Plink not available for {}".format(platform.system())) + def test_with_plink(self): + """Tests to read a binary file using Plink.""" + # Checking if Plink is in the path + plink_path = "plink" + if platform.system() == "Windows": + plink_path += ".exe" + + if find_executable(plink_path) is None: + # The url for each platform + url = "http://pngu.mgh.harvard.edu/~purcell/plink/dist/{filename}" + + # Getting the name of the file + filename = "" + if platform.system() == "Windows": + filename = "plink-1.07-dos.zip" + elif platform.system() == "Darwin": + filename = "plink-1.07-mac-intel.zip" + elif platform.system() == "Linux": + if platform.architecture()[0].startswith("32"): + filename = "plink-1.07-i686.zip" + elif platform.architecture()[0].startswith("64"): + filename = "plink-1.07-x86_64.zip" + else: + self.skipTest("System not compatible for Plink") + else: + self.skipTest("System not compatible for Plink") + + # Downloading Plink + zip_path = os.path.join(self.tmp_dir, filename) + try: + urlretrieve( + url.format(filename=filename), + zip_path, + ) + except: + self.skipTest("Plink's URL is not available") + + # Unzipping Plink + plink_dir = os.path.join(self.tmp_dir, ) + with zipfile.ZipFile(zip_path, "r") as z: + z.extractall(self.tmp_dir) + plink_path = os.path.join( + self.tmp_dir, os.path.splitext(filename)[0], + plink_path, + ) + if not os.path.isfile(plink_path): + self.skipTest("Cannot use Plink") + + # Making the script executable + if platform.system() in {"Darwin", "Linux"}: + os.chmod(plink_path, stat.S_IRWXU) + + # Testing Plink works + try: + check_call([ + plink_path, + "--noweb", + "--help", + "--out", os.path.join(self.tmp_dir, "execution_test") + ], stdout=PIPE, stderr=PIPE) + except CalledProcessError: + self.skipTest("Plink cannot be properly used") + except IOError: + self.skipTest("Plink was not properly installed") + + # Creating the BED file + all_genotypes = [ + [0, 1, 0, 0, -1, 0, 1, 0, 0, 2], + [2, 1, 2, 2, 2, 2, 2, 1, 0, 1], + [0, 0, 0, 0, 0, 1, 0, 0, 0, 0], + ] + prefix = os.path.join(self.tmp_dir, "test_output") + with PyPlink(prefix, "w") as pedfile: + for genotypes in all_genotypes: + pedfile.write_marker(genotypes) + + # Creating the FAM file + fam_content = [ + ["F0", "S0", "0", "0", "1", "-9"], + ["F1", "S1", "0", "0", "1", "-9"], + ["F2", "S2", "0", "0", "2", "-9"], + ["F3", "S3", "0", "0", "1", "-9"], + ["F4", "S4", "0", "0", "1", "-9"], + ["F5", "S5", "0", "0", "2", "-9"], + ["F6", "S6", "0", "0", "1", "-9"], + ["F7", "S7", "0", "0", "0", "-9"], + ["F8", "S8", "0", "0", "1", "-9"], + ["F9", "S9", "0", "0", "2", "-9"], + ] + with open(prefix + ".fam", "w") as o_file: + for sample in fam_content: + print(*sample, sep=" ", file=o_file) + + # Creating the BIM file + bim_content = [ + ["1", "M0", "0", "123", "A", "G"], + ["1", "M1", "0", "124", "C", "T"], + ["2", "M2", "0", "117", "G", "C"], + ] + with open(prefix + ".bim", "w") as o_file: + for marker in bim_content: + print(*marker, sep="\t", file=o_file) + + # Creating a transposed pedfile using Plink + out_prefix = prefix + "_transposed" + try: + check_call([ + plink_path, + "--noweb", + "--bfile", prefix, + "--recode", "--transpose", "--tab", + "--out", out_prefix, + ], stdout=PIPE, stderr=PIPE) + except CalledProcessError: + self.fail("Plink could not recode file") + + # Checking the two files exists + self.assertTrue(os.path.isfile(out_prefix + ".tped")) + self.assertTrue(os.path.isfile(out_prefix + ".tfam")) + + # Checking the content of the TFAM file + expected = "\n".join("\t".join(sample) for sample in fam_content) + with open(out_prefix + ".tfam", "r") as i_file: + self.assertEqual(expected + "\n", i_file.read()) + + # Checking the content of the TPED file + with open(out_prefix + ".tped", "r") as i_file: + # Checking the first marker + marker_1 = i_file.readline().rstrip("\r\n").split("\t") + self.assertEqual(["1", "M0", "0", "123"], marker_1[:4]) + self.assertEqual(["G G", "A G", "G G", "G G", "0 0", "G G", "A G", + "G G", "G G", "A A"], + marker_1[4:]) + + # Checking the second marker + marker_2 = i_file.readline().rstrip("\r\n").split("\t") + self.assertEqual(["1", "M1", "0", "124"], marker_2[:4]) + self.assertEqual(["C C", "T C", "C C", "C C", "C C", "C C", "C C", + "T C", "T T", "T C"], + marker_2[4:]) + + # Checking the third marker + marker_3 = i_file.readline().rstrip("\r\n").split("\t") + self.assertEqual(["2", "M2", "0", "117"], marker_3[:4]) + self.assertEqual(["C C", "C C", "C C", "C C", "C C", "G C", "C C", + "C C", "C C", "C C"], + marker_3[4:]) + + # Checking this is the end of the file + self.assertEqual("", i_file.readline())